diff --git a/google-cloud-pubsub-v1/.owlbot.rb b/google-cloud-pubsub-v1/.owlbot.rb new file mode 100644 index 000000000000..b613edf662c4 --- /dev/null +++ b/google-cloud-pubsub-v1/.owlbot.rb @@ -0,0 +1,25 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +OwlBot.modifier path: "lib/google/cloud/pubsub/v1/schema_service/client.rb" do |content| + content.gsub! "::Logger", "::Logger, ::Google::Logging::GoogleSdkLoggerDelegator" +end +OwlBot.modifier path: "lib/google/cloud/pubsub/v1/subscription_admin/client.rb" do |content| + content.gsub! "::Logger", "::Logger, ::Google::Logging::GoogleSdkLoggerDelegator" +end +OwlBot.modifier path: "lib/google/cloud/pubsub/v1/topic_admin/client.rb" do |content| + content.gsub! "::Logger", "::Logger, ::Google::Logging::GoogleSdkLoggerDelegator" +end + +OwlBot.move_files diff --git a/google-cloud-pubsub/Gemfile b/google-cloud-pubsub/Gemfile index f3bbfc6b53f7..6c96ff69ee64 100644 --- a/google-cloud-pubsub/Gemfile +++ b/google-cloud-pubsub/Gemfile @@ -35,3 +35,4 @@ gem "retriable", "~> 3.1.2" gem "simplecov", "~> 0.22" gem "yard", "~> 0.9.37" gem "yard-doctest", "~> 0.1.17" +gem "google-logging-utils", "~> 0.3.0" diff --git a/google-cloud-pubsub/README.md b/google-cloud-pubsub/README.md index 080d33a1cf61..fc1b3d7e32ab 100644 --- a/google-cloud-pubsub/README.md +++ b/google-cloud-pubsub/README.md @@ -92,6 +92,57 @@ module GRPC end ``` +### Enabling library level logging + +This library includes an opt-in logging mechanism that provides detailed information about high-level operations. These logs are useful for troubleshooting and monitoring the client's behavior. When enabled, logs are tagged with subtags to indicate the operation type. + +The following subtags are used: + +* `callback-delivery`: Logs when a message is delivered to the user-provided callback. +* `callback-exceptions`: Logs any exceptions raised from the user callback. +* `ack-nack`: Logs when a message is acknowledged (`ack`) or negatively acknowledged (`nack`). +* `ack-batch`: Logs the reason and size of acknowledgement batches sent to the server. +* `publish-batch`: Logs the reason and size of message batches sent to the server for publishing. +* `expiry`: Logs when a message's lease expires and it is dropped from client-side lease management. +* `subscriber-streams`: Logs key events in the subscriber's streaming connection, such as opening, closing, and errors. +* `subscriber-flow-control`: Logs when the subscriber's client-side flow control is paused or resumed. + +**WARNING:** These logs may contain message data in plaintext, which could include sensitive information. Ensure you are practicing good data hygiene with your application logs. It is recommended to enable this logging only for debugging purposes and not permanently in production. + +To enable logging across all of Google Cloud Ruby SDK Gems, set the `GOOGLE_SDK_RUBY_LOGGING_GEMS` environment variable to `all`. +To enable logging for just the google-cloud-pubsub gem, set the `GOOGLE_SDK_RUBY_LOGGING_GEMS` environment variable to a comma separated string that contains `pubsub` +To disable logging across all of Google Cloud Ruby SDK Gems, set the `GOOGLE_SDK_RUBY_LOGGING_GEMS` to `none` + +```sh +export GOOGLE_SDK_RUBY_LOGGING_GEMS=pubsub +``` + +You can programmatically configure a custom logger. The logger can be set globally for the Pub/Sub library, or provided on a per-client basis. + +To set a logger globally, configure it on the `Google::Cloud` configuration object: + +```ruby +require "google/cloud/pubsub" +require "logger" + +# Configure a global logger for the pubsub library +Google::Cloud.configure.pubsub.logger = Logger.new "my-app.log" +``` + +Alternatively, you can provide a logger directly to the `PubSub` client initializer. If a logger instance is provided, it will override any globally configured logger. + +```ruby +require "google/cloud/pubsub" +require "logger" + +# Provide a logger directly to the client +custom_logger = Logger.new "pubsub-client.log" +pubsub = Google::Cloud::PubSub.new logger: custom_logger +``` + +If no custom logger is configured, a default logger that writes to standard output will be used. + + ## Supported Ruby Versions This library is supported on Ruby 3.1+. diff --git a/google-cloud-pubsub/lib/google-cloud-pubsub.rb b/google-cloud-pubsub/lib/google-cloud-pubsub.rb index da6899323b9a..1d9f1b2e3cd4 100644 --- a/google-cloud-pubsub/lib/google-cloud-pubsub.rb +++ b/google-cloud-pubsub/lib/google-cloud-pubsub.rb @@ -23,6 +23,7 @@ require "google/cloud" unless defined? Google::Cloud.new require "google/cloud/config" require "googleauth" +require "logger" module Google module Cloud @@ -142,6 +143,8 @@ def self.pubsub project_id = nil, "https://www.googleapis.com/auth/pubsub" ] + default_logger = Logger.new $stdout + config.add_field! :project_id, default_project, match: String, allow_nil: true config.add_alias! :project, :project_id config.add_field! :credentials, default_creds, match: [String, Hash, Google::Auth::Credentials], allow_nil: true @@ -153,4 +156,5 @@ def self.pubsub project_id = nil, config.add_field! :on_error, nil, match: Proc config.add_field! :endpoint, nil, match: String config.add_field! :universe_domain, nil, match: String + config.add_field! :logger, default_logger, match: Logger, allow_nil: true end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub.rb b/google-cloud-pubsub/lib/google/cloud/pubsub.rb index 0963999262a7..1c4eead30166 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub.rb @@ -14,6 +14,7 @@ require "google-cloud-pubsub" +require "google/cloud/pubsub/logging" require "google/cloud/pubsub/project" require "google/cloud/config" require "google/cloud/env" @@ -70,11 +71,16 @@ module PubSub # # * `https://www.googleapis.com/auth/pubsub` # @param [Numeric] timeout Default timeout to use in requests. Optional. + # @param [String] universe_domain A custom universe domain. Optional. # @param [String] endpoint Override of the endpoint host name. Optional. # If the param is nil, uses the default endpoint. # @param [String] emulator_host Pub/Sub emulator host. Optional. # If the param is nil, uses the value of the `emulator_host` config. - # @param universe_domain [String] A custom universe domain. Optional. + # @param [Logger] logger Optional Logger instance for emitting + # library-level debug logs. If not provided, it will default to + # configure.logger, which defaults to Logger.new STDOUT if not set. To + # enable logging, set environment variable GOOGLE_SDK_RUBY_LOGGING_GEMS + # to "all" or a comma separated list of gem names, including "pubsub". # # @return [Google::Cloud::PubSub::Project] # @@ -92,13 +98,15 @@ def self.new project_id: nil, timeout: nil, universe_domain: nil, endpoint: nil, - emulator_host: nil + emulator_host: nil, + logger: nil project_id ||= default_project_id scope ||= configure.scope timeout ||= configure.timeout endpoint ||= configure.endpoint universe_domain ||= configure.universe_domain emulator_host ||= configure.emulator_host + logger ||= configure.logger if emulator_host credentials = :this_channel_is_insecure @@ -114,10 +122,12 @@ def self.new project_id: nil, project_id = project_id.to_s # Always cast to a string raise ArgumentError, "project_id is missing" if project_id.empty? + logging = Google::Cloud::PubSub::Logging.create logger service = PubSub::Service.new project_id, credentials, host: endpoint, timeout: timeout, - universe_domain: universe_domain + universe_domain: universe_domain, + logging: logging PubSub::Project.new service end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb index 67becf27b7b0..161103b546bc 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb @@ -157,7 +157,7 @@ def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &cal end batch_action = batch.add msg, callback if batch_action == :full - publish_batches! + publish_batches! reason: "batch full" elsif @published_at.nil? # Set initial time to now to start the background counter @published_at = Time.now @@ -180,7 +180,7 @@ def stop break if @stopped @stopped = true - publish_batches! stop: true + publish_batches! stop: true, reason: "shutdown" @cond.signal @publish_thread_pool.shutdown end @@ -234,7 +234,7 @@ def stop! timeout = nil # @return [AsyncPublisher] returns self so calls can be chained. def flush synchronize do - publish_batches! + publish_batches! reason: "manual flush" @cond.signal end @@ -313,7 +313,7 @@ def run_background time_since_first_publish = Time.now - @published_at if time_since_first_publish > @interval # interval met, flush the batches... - publish_batches! + publish_batches! reason: "interval timeout" @cond.wait else # still waiting for the interval to publish the batch... @@ -347,28 +347,28 @@ def stop_publish ordering_key, err end end - def publish_batches! stop: nil + def publish_batches! stop: nil, reason: "unknown" @batches.reject! { |_ordering_key, batch| batch.empty? } @batches.each_value do |batch| ready = batch.publish! stop: stop - publish_batch_async @topic_name, batch if ready + publish_batch_async @topic_name, batch, reason: reason if ready end # Set published_at to nil to wait indefinitely @published_at = nil end - def publish_batch_async topic_name, batch + def publish_batch_async topic_name, batch, reason: "unknown" # TODO: raise unless @publish_thread_pool.running? return unless @publish_thread_pool.running? Concurrent::Promises.future_on( - @publish_thread_pool, topic_name, batch - ) { |t, b| publish_batch_sync t, b } + @publish_thread_pool, topic_name, batch, reason + ) { |t, b, r| publish_batch_sync t, b, reason: r } end # rubocop:disable Metrics/AbcSize - def publish_batch_sync topic_name, batch + def publish_batch_sync topic_name, batch, reason: "unknown" # The only batch methods that are safe to call from the loop are # rebalance! and reset! because they are the only methods that are # synchronized. @@ -379,6 +379,7 @@ def publish_batch_sync topic_name, batch grpc = @service.publish topic_name, items.map(&:msg), compress: compress && batch.total_message_bytes >= compression_bytes_threshold + service.logging.log_batch "publish-batch", reason, "publish", items.count, items.sum(&:bytesize) items.zip Array(grpc.message_ids) do |item, id| @flow_controller.release item.bytesize next unless item.callback diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb index 613312619978..94cd0505cd1c 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb @@ -35,6 +35,7 @@ module PubSub # end # class BatchPublisher + ## # @private The messages to publish attr_reader :messages @@ -117,10 +118,11 @@ def to_gcloud_messages message_ids ## # @private Call the publish API with arrays of data and attrs. - def publish_batch_messages topic_name, service + def publish_batch_messages topic_name, service, reason: "unknown" grpc = service.publish topic_name, messages, compress: compress && total_message_bytes >= compression_bytes_threshold + service.logging.log_batch "publish-batch", reason, "publish", messages.count, @total_message_bytes to_gcloud_messages Array(grpc.message_ids) end end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/logging.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/logging.rb new file mode 100644 index 000000000000..9cf3299a6242 --- /dev/null +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/logging.rb @@ -0,0 +1,79 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +require "logger" +require "google/logging/google_sdk_logger_delegator" +require "google/cloud/config" + +module Google + module Cloud + module PubSub + ## + # @private + class Logging + LOG_NAME = "pubsub".freeze + VALID_LOG_LEVELS = [:debug, :info, :warn, :error, :fatal].freeze + private_constant :VALID_LOG_LEVELS, :LOG_NAME + + ## + # @private + def self.create logger + new logger + end + + ## + # @private + # rubocop:disable Naming/BlockForwarding + def log level, subtag, &message_block + return unless VALID_LOG_LEVELS.include?(level) && block_given? + @logging_delegator.public_send(level, "#{LOG_NAME}:#{subtag}", &message_block) + end + # rubocop:enable Naming/BlockForwarding + + ## + # @private + def log_batch logger_name, reason, type, num_messages, total_bytes + log :info, logger_name do + "#{reason} triggered #{type} batch of #{num_messages} messages, a total of #{total_bytes} bytes" + end + end + + ## + # @private + def log_ack_nack ack_ids, type + ack_ids.each do |ack_id| + log :info, "ack-nack" do + "message (ackID #{ack_id}) #{type}" + end + end + end + + ## + # @private + def log_expiry expired + expired.each do |ack_id, item| + log :info, "expiry" do + "message (ID #{item.message_id}, ackID #{ack_id}) has been dropped from leasing due to a timeout" + end + end + end + + private + + def initialize logger + @logging_delegator = Google::Logging::GoogleSdkLoggerDelegator.new LOG_NAME, logger + end + end + end + end +end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb index 814ed067ee7d..9669d5e621a5 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb @@ -14,6 +14,7 @@ require "google/cloud/pubsub/service" +require "google/cloud/pubsub/subscriber" require "google/cloud/pubsub/message_listener/stream" require "google/cloud/pubsub/message_listener/timed_unary_buffer" require "monitor" diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb index ca2333d76c1c..950f528d8a66 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - require "monitor" module Google @@ -22,9 +21,9 @@ class MessageListener ## # @private class Inventory - InventoryItem = Struct.new :bytesize, :pulled_at do + InventoryItem = Struct.new :message_id, :bytesize, :pulled_at do def self.from rec_msg - new rec_msg.to_proto.bytesize, Time.now + new rec_msg.message.message_id, rec_msg.to_proto.bytesize, Time.now end end @@ -70,18 +69,24 @@ def add *rec_msgs def remove *ack_ids ack_ids.flatten! ack_ids.compact! - return if ack_ids.empty? + return {} if ack_ids.empty? + removed_items = {} synchronize do - @inventory.delete_if { |ack_id, _| ack_ids.include? ack_id } + removed, keep = @inventory.partition { |ack_id, _| ack_ids.include? ack_id } + @inventory = keep.to_h + removed_items = removed.to_h @wait_cond.broadcast end + removed_items end def remove_expired! synchronize do extension_time = Time.new - extension - @inventory.delete_if { |_ack_id, item| item.pulled_at < extension_time } + expired, keep = @inventory.partition { |_ack_id, item| item.pulled_at < extension_time } + @inventory = keep.to_h + stream.subscriber.service.logging.log_expiry expired @wait_cond.broadcast end end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb index f7e0277d0f7f..846b61b5949b 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - require "google/cloud/pubsub/message_listener/sequencer" require "google/cloud/pubsub/message_listener/enumerator_queue" require "google/cloud/pubsub/message_listener/inventory" @@ -73,7 +72,12 @@ def initialize subscriber execution_interval: 30 ) do # push empty request every 30 seconds to keep stream alive - push Google::Cloud::PubSub::V1::StreamingPullRequest.new unless inventory.empty? + unless inventory.empty? + subscriber.service.logging.log :info, "subscriber-streams" do + "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" + end + push Google::Cloud::PubSub::V1::StreamingPullRequest.new + end end.execute end @@ -93,6 +97,9 @@ def stop synchronize do break if @stopped + subscriber.service.logging.log :info, "subscriber-streams" do + "stopping stream for subscription #{@subscriber.subscription_name}" + end # Close the stream by pushing the sentinel value. # The unary pusher does not use the stream, so it can close here. @request_queue&.push self @@ -138,8 +145,9 @@ def acknowledge *messages, &callback ack_ids = coerce_ack_ids messages return true if ack_ids.empty? + removed_items = {} synchronize do - @inventory.remove ack_ids + removed_items = @inventory.remove ack_ids @subscriber.buffer.acknowledge ack_ids, callback end @@ -152,8 +160,9 @@ def modify_ack_deadline deadline, *messages, &callback mod_ack_ids = coerce_ack_ids messages return true if mod_ack_ids.empty? + removed_items = {} synchronize do - @inventory.remove mod_ack_ids + removed_items = @inventory.remove mod_ack_ids @subscriber.buffer.modify_ack_deadline deadline, mod_ack_ids, callback end @@ -215,7 +224,13 @@ class RestartStream < StandardError; end def background_run synchronize do # Don't allow a stream to restart if already stopped - return if @stopped + if @stopped + subscriber.service.logging.log :debug, "subscriber-streams" do + "not filling stream for subscription #{@subscriber.subscription_name} because stream is already" \ + " stopped" + end + return + end @stopped = false @paused = false @@ -233,6 +248,9 @@ def background_run # Call the StreamingPull API to get the response enumerator options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } } enum = @subscriber.service.streaming_pull @request_queue.each, options + subscriber.service.logging.log :info, "subscriber-streams" do + "rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened" + end loop do synchronize do @@ -287,13 +305,23 @@ def background_run stop rescue GRPC::Cancelled, GRPC::DeadlineExceeded, GRPC::Internal, GRPC::ResourceExhausted, GRPC::Unauthenticated, - GRPC::Unavailable + GRPC::Unavailable => e + status_code = e.respond_to?(:code) ? e.code : e.class.name + subscriber.service.logging.log :error, "subscriber-streams" do + "Subscriber stream for subscription #{@subscriber.subscription_name} has ended with status " \ + "#{status_code}; will be retried." + end # Restart the stream with an incremental back for a retriable error. - retry rescue RestartStream + subscriber.service.logging.log :info, "subscriber-streams" do + "Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried." + end retry rescue StandardError => e + subscriber.service.logging.log :error, "subscriber-streams" do + "error on stream for subscription #{@subscriber.subscription_name}: #{e.inspect}" + end @subscriber.error! e retry @@ -336,13 +364,22 @@ def perform_callback_async rec_msg return unless callback_thread_pool.running? Concurrent::Promises.future_on( - callback_thread_pool, rec_msg, &method(:perform_callback_sync) + callback_thread_pool, + rec_msg, + &method(:perform_callback_sync) ) end def perform_callback_sync rec_msg + subscriber.service.logging.log :info, "callback-delivery" do + "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) delivery to user callbacks" + end @subscriber.callback.call rec_msg unless stopped? rescue StandardError => e + subscriber.service.logging.log :info, "callback-exceptions" do + "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) caused a user callback exception: " \ + "#{e.inspect}" + end @subscriber.error! e ensure release rec_msg @@ -369,6 +406,9 @@ def pause_streaming! return unless pause_streaming? @paused = true + subscriber.service.logging.log :info, "subscriber-flow-control" do + "subscriber for #{@subscriber.subscription_name} is client-side flow control blocked" + end end def pause_streaming? @@ -382,6 +422,9 @@ def unpause_streaming! return unless unpause_streaming? @paused = nil + subscriber.service.logging.log :info, "subscriber-flow-control" do + "subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control" + end # signal to the background thread that we are unpaused @pause_cond.broadcast end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb index 525bc4746f4f..b2644cff5490 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb @@ -64,7 +64,7 @@ def initialize subscriber, max_bytes: 500_000, interval: 1.0 @retry_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads @callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads @task = Concurrent::TimerTask.new execution_interval: interval do - flush! + flush! reason: "interval timeout" end end @@ -108,9 +108,9 @@ def renew_lease deadline, ack_ids true end - def flush! + def flush! reason: "manual flush" # Grab requests from the buffer and release synchronize ASAP - requests = flush_requests! + requests = flush_requests! reason return if requests.empty? # Perform the RCP calls concurrently @@ -167,7 +167,7 @@ def stop @task.shutdown @retry_thread_pool.shutdown @callback_thread_pool.shutdown - flush! + flush! reason: "shutdown" self end @@ -296,7 +296,9 @@ def retry_request ack_ids, modack end end - def flush_requests! + # rubocop:disable Metrics/AbcSize + + def flush_requests! reason prev_reg = synchronize do return {} if @register.empty? @@ -308,16 +310,34 @@ def flush_requests! groups = prev_reg.each_pair.group_by { |_ack_id, delay| delay } req_hash = groups.transform_values { |v| v.map(&:first) } - requests = { acknowledge: [] } + requests = { acknowledge: [], modify_ack_deadline: [] } ack_ids = Array(req_hash.delete(:ack)) # ack has no deadline set - requests[:acknowledge] = create_acknowledge_requests ack_ids if ack_ids.any? + if ack_ids.any? + requests[:acknowledge] = create_acknowledge_requests ack_ids + new_reason = if requests[:acknowledge].length > 1 + "#{reason} and partitioned for exceeding max bytes" + else + reason + end + requests[:acknowledge].each do |req| + @subscriber.service.logging.log_batch "ack-batch", new_reason, "ack", req.ack_ids.length, req.to_proto.bytesize + end + end requests[:modify_ack_deadline] = req_hash.map do |mod_deadline, mod_ack_ids| - create_modify_ack_deadline_requests mod_deadline, mod_ack_ids + mod_ack_reqs = create_modify_ack_deadline_requests mod_deadline, mod_ack_ids + type = mod_deadline.zero? ? "nack" : "modack" + new_reason = mod_ack_reqs.length > 1 ? "#{reason} and partitioned for exceeding max bytes" : reason + mod_ack_reqs.each do |req| + @subscriber.service.logging.log_batch "ack-batch", new_reason, type, req.ack_ids.length, req.to_proto.bytesize + end + mod_ack_reqs end.flatten requests end + # rubocop:enable Metrics/AbcSize + def create_acknowledge_requests ack_ids req = Google::Cloud::PubSub::V1::AcknowledgeRequest.new( subscription: subscription_name, diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/publisher.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/publisher.rb index 43cb7a4b2fc9..0134324ab32f 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/publisher.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/publisher.rb @@ -223,7 +223,8 @@ def publish data = nil, attributes = nil, ordering_key: nil, compress: nil, comp block&.call batch return nil if batch.messages.count.zero? - batch.publish_batch_messages name, service + reason = block_given? ? "synchronous publish multiple" : "synchronous publish single" + batch.publish_batch_messages name, service, reason: reason end ## diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb index 344205c0bab3..06faf3bfe30b 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb @@ -19,6 +19,7 @@ require "google/cloud/pubsub/version" require "google/cloud/pubsub/v1" require "google/cloud/pubsub/admin_clients" +require "google/cloud/pubsub/logging" require "securerandom" module Google @@ -27,6 +28,7 @@ module PubSub ## # @private Represents the Pub/Sub service API, including IAM mixins. class Service + attr_accessor :project attr_accessor :credentials attr_accessor :host @@ -40,15 +42,20 @@ class Service attr_reader :universe_domain + ## + # @private The Logging object. + attr_reader :logging + ## # Creates a new Service instance. - def initialize project, credentials, host: nil, timeout: nil, universe_domain: nil + def initialize project, credentials, host: nil, timeout: nil, universe_domain: nil, logging: nil @project = project @credentials = credentials @host = host @timeout = timeout @client_id = SecureRandom.uuid.freeze @universe_domain = universe_domain || ENV["GOOGLE_CLOUD_UNIVERSE_DOMAIN"] || "googleapis.com" + @logging = logging end def subscription_admin @@ -141,6 +148,7 @@ def streaming_pull request_enum, options = {} ## # Acknowledges receipt of a message. def acknowledge subscription, *ack_ids + logging.log_ack_nack ack_ids, "ack" subscription_admin.acknowledge_internal subscription: subscription_path(subscription), ack_ids: ack_ids end @@ -148,6 +156,9 @@ def acknowledge subscription, *ack_ids ## # Modifies the ack deadline for a specific message. def modify_ack_deadline subscription, ids, deadline + if deadline.zero? + logging.log_ack_nack Array(ids), "nack" + end subscription_admin.modify_ack_deadline_internal subscription: subscription_path(subscription), ack_ids: Array(ids), ack_deadline_seconds: deadline @@ -193,6 +204,7 @@ def override_client_config_timeouts config rpc.timeout = timeout if rpc.respond_to? :timeout= end end + end end Pubsub = PubSub unless const_defined? :Pubsub diff --git a/google-cloud-pubsub/samples/Gemfile b/google-cloud-pubsub/samples/Gemfile index c537774d0a21..86b171a01b45 100644 --- a/google-cloud-pubsub/samples/Gemfile +++ b/google-cloud-pubsub/samples/Gemfile @@ -37,4 +37,5 @@ group :test do gem "rack-test" gem "rake" gem "toys-core" + gem "google-logging-utils", "~>0.3.0" end diff --git a/google-cloud-pubsub/samples/acceptance/topics_test.rb b/google-cloud-pubsub/samples/acceptance/topics_test.rb index 16224581c0c9..eae4ec7d7dd4 100644 --- a/google-cloud-pubsub/samples/acceptance/topics_test.rb +++ b/google-cloud-pubsub/samples/acceptance/topics_test.rb @@ -406,9 +406,10 @@ topic: @topic.name # pubsub_publish - assert_output "Message published asynchronously.\n" do + out, _err = capture_io do publish_message_async topic_id: topic_id end + assert_includes out, "Message published asynchronously." messages = [] expect_with_retry "pubsub_publish" do @@ -505,9 +506,10 @@ topic: @topic.name # pubsub_publisher_concurrency_control - assert_output "Message published asynchronously.\n" do + out, _err = capture_io do publish_messages_async_with_concurrency_control topic_id: topic_id end + assert_includes out, "Message published asynchronously." messages = [] expect_with_retry "pubsub_publisher_concurrency_control" do diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb index fec618a2e312..6383daf662fa 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb @@ -217,8 +217,13 @@ end it "removes expired items" do - subscriber_mock = Minitest::Mock.new - inventory = Google::Cloud::PubSub::MessageListener::Inventory.new subscriber_mock, + logging_mock = Minitest::Mock.new + logging_mock.expect :log_expiry, nil, [Array] + service_mock = OpenStruct.new logging: logging_mock + listener_mock = OpenStruct.new service: service_mock + stream_mock = OpenStruct.new subscriber: listener_mock + + inventory = Google::Cloud::PubSub::MessageListener::Inventory.new stream_mock, limit: 1000, bytesize: 100_000, extension: 3600, @@ -237,6 +242,8 @@ inventory.remove_expired! _(inventory.ack_ids).must_equal ["ack-id-1112", "ack-id-1113"] + + logging_mock.verify end it "knows its max_duration_per_lease_extension limit" do diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/service_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/service_test.rb index 8f065f4e1a8d..90bc34fbb254 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/service_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/service_test.rb @@ -15,6 +15,7 @@ require "helper" require "gapic/grpc/service_stub" require "google/cloud/pubsub/v1" +require "logger" describe Google::Cloud::PubSub::Service do class PubSubServiceTestDummyStub @@ -35,6 +36,20 @@ def logger end end + class MockLogging + def log_ack_nack ack_ids, type + # Do nothing + end + + def log_batch logger_name, reason, type, num_messages, total_bytes + # Do nothing + end + + def log_expiry expired + # Do nothing + end + end + let(:project) { "test" } let(:credentials) { :this_channel_is_insecure } let(:timeout) { 123.4 } @@ -42,6 +57,7 @@ def logger let(:endpoint_2) { "localhost:4567" } let(:universe_domain) { "googleapis.com" } let(:universe_domain_2) { "mydomain.com" } + let(:mock_logging) { MockLogging.new } # Values below are hardcoded in Service. let(:lib_name) { "gccl" } @@ -203,13 +219,13 @@ def logger end it "should raise errors other than grpc on ack" do - service = Google::Cloud::PubSub::Service.new project, nil + service = Google::Cloud::PubSub::Service.new project, nil, logging: mock_logging mocked_subscription_admin = Minitest::Mock.new service.mocked_subscription_admin = mocked_subscription_admin def mocked_subscription_admin.acknowledge_internal *args raise RuntimeError.new "test" end - assert_raises RuntimeError do + assert_raises RuntimeError do service.acknowledge "sub","ack_id" end end diff --git a/google-cloud-pubsub/test/google/cloud/pubsub_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub_test.rb index aa5009c316b3..546d36dc37db 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub_test.rb @@ -97,12 +97,13 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -152,12 +153,13 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -184,12 +186,13 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -231,12 +234,13 @@ def creds.is_a? target it "allows timeout to be set" do timeout = 123.4 - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal default_credentials _(timeout).must_equal timeout _(host).must_be :nil? _(universe_domain).must_be :nil? + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging } # Clear all environment variables @@ -255,12 +259,13 @@ def creds.is_a? target it "allows endpoint to be set" do endpoint = "localhost:4567" - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal default_credentials _(timeout).must_be :nil? _(host).must_equal endpoint _(universe_domain).must_be :nil? + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project, credentials: credentials } @@ -283,12 +288,13 @@ def creds.is_a? target it "allows universe_domain to be set" do actual_universe_domain = "myuniverse.com" - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal default_credentials _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_equal actual_universe_domain + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project, credentials: credentials } @@ -331,13 +337,14 @@ def creds.is_a? target _(scope).must_equal default_scopes OpenStruct.new project_id: "project-id" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(project).must_equal "project-id" _(credentials).must_be_kind_of OpenStruct _(credentials.project_id).must_equal "project-id" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } empty_env = OpenStruct.new @@ -375,12 +382,13 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -413,12 +421,13 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -451,10 +460,11 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_equal 42.0 + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -488,10 +498,11 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_equal 42.0 + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -523,9 +534,10 @@ def creds.is_a? target actual_endpoint = "myendpoint.com" actual_universe_domain = "mydomain.com" - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(host).must_equal actual_endpoint _(universe_domain).must_equal actual_universe_domain + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } diff --git a/google-cloud-pubsub/test/helper.rb b/google-cloud-pubsub/test/helper.rb index e5b42464bb68..3d19a6cd58ed 100644 --- a/google-cloud-pubsub/test/helper.rb +++ b/google-cloud-pubsub/test/helper.rb @@ -107,7 +107,12 @@ def message_hash class MockPubsub < Minitest::Spec let(:project) { "test" } let(:credentials) { OpenStruct.new(client: OpenStruct.new(updater_proc: Proc.new {})) } - let(:pubsub) { Google::Cloud::PubSub::Project.new(Google::Cloud::PubSub::Service.new(project, credentials)) } + let(:pubsub) do + logger = Google::Cloud.configure.pubsub.logger || Logger.new(STDOUT) + logging = Google::Cloud::PubSub::Logging.create logger + service = Google::Cloud::PubSub::Service.new project, credentials, logging: logging + Google::Cloud::PubSub::Project.new service + end def topics_hash num_topics, token = "" topics = num_topics.times.map do