From b9e21d9c717ecd961dcf3e4ad70635261816008b Mon Sep 17 00:00:00 2001 From: Charlotte Yun Date: Thu, 30 Oct 2025 19:07:48 -0700 Subject: [PATCH 01/10] chore(Pubsub): Add adhoc debug logging --- .../google/cloud/pubsub/async_publisher.rb | 23 +++--- .../google/cloud/pubsub/batch_publisher.rb | 6 +- .../lib/google/cloud/pubsub/logger_helper.rb | 60 +++++++++++++++ .../google/cloud/pubsub/message_listener.rb | 5 +- .../pubsub/message_listener/inventory.rb | 22 ++++-- .../cloud/pubsub/message_listener/stream.rb | 65 ++++++++++++++-- .../message_listener/timed_unary_buffer.rb | 38 ++++++++-- .../lib/google/cloud/pubsub/publisher.rb | 3 +- .../lib/google/cloud/pubsub/service.rb | 9 +++ .../lib/google/cloud/pubsub/subscriber.rb | 74 ++++++++++++++++++- .../samples/acceptance/topics_test.rb | 6 +- 11 files changed, 273 insertions(+), 38 deletions(-) create mode 100644 google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb index 67becf27b7b0..aec834e510b0 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb @@ -21,6 +21,7 @@ require "google/cloud/pubsub/publish_result" require "google/cloud/pubsub/service" require "google/cloud/pubsub/convert" +require "google/cloud/pubsub/logger_helper" module Google module Cloud @@ -59,6 +60,7 @@ module PubSub # class AsyncPublisher include MonitorMixin + include Google::Cloud::PubSub::LoggerHelper attr_reader :topic_name attr_reader :max_bytes @@ -157,7 +159,7 @@ def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &cal end batch_action = batch.add msg, callback if batch_action == :full - publish_batches! + publish_batches! reason: "batch full" elsif @published_at.nil? # Set initial time to now to start the background counter @published_at = Time.now @@ -180,7 +182,7 @@ def stop break if @stopped @stopped = true - publish_batches! stop: true + publish_batches! stop: true, reason: "shutdown" @cond.signal @publish_thread_pool.shutdown end @@ -234,7 +236,7 @@ def stop! timeout = nil # @return [AsyncPublisher] returns self so calls can be chained. def flush synchronize do - publish_batches! + publish_batches! reason: "manual flush" @cond.signal end @@ -313,7 +315,7 @@ def run_background time_since_first_publish = Time.now - @published_at if time_since_first_publish > @interval # interval met, flush the batches... - publish_batches! + publish_batches! reason: "interval timeout" @cond.wait else # still waiting for the interval to publish the batch... @@ -347,28 +349,28 @@ def stop_publish ordering_key, err end end - def publish_batches! stop: nil + def publish_batches! stop: nil, reason: "unknown" @batches.reject! { |_ordering_key, batch| batch.empty? } @batches.each_value do |batch| ready = batch.publish! stop: stop - publish_batch_async @topic_name, batch if ready + publish_batch_async @topic_name, batch, reason: reason if ready end # Set published_at to nil to wait indefinitely @published_at = nil end - def publish_batch_async topic_name, batch + def publish_batch_async topic_name, batch, reason: "unknown" # TODO: raise unless @publish_thread_pool.running? return unless @publish_thread_pool.running? Concurrent::Promises.future_on( - @publish_thread_pool, topic_name, batch - ) { |t, b| publish_batch_sync t, b } + @publish_thread_pool, topic_name, batch, reason + ) { |t, b, r| publish_batch_sync t, b, reason: r } end # rubocop:disable Metrics/AbcSize - def publish_batch_sync topic_name, batch + def publish_batch_sync topic_name, batch, reason: "unknown" # The only batch methods that are safe to call from the loop are # rebalance! and reset! because they are the only methods that are # synchronized. @@ -379,6 +381,7 @@ def publish_batch_sync topic_name, batch grpc = @service.publish topic_name, items.map(&:msg), compress: compress && batch.total_message_bytes >= compression_bytes_threshold + log_batch "publish-batch", reason, "publish", items.count, items.sum(&:bytesize) items.zip Array(grpc.message_ids) do |item, id| @flow_controller.release item.bytesize next unless item.callback diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb index 613312619978..1d718d3c58f8 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb @@ -14,6 +14,7 @@ require "google/cloud/pubsub/convert" +require "google/cloud/pubsub/logger_helper" module Google module Cloud @@ -35,6 +36,8 @@ module PubSub # end # class BatchPublisher + include Google::Cloud::PubSub::LoggerHelper + ## # @private The messages to publish attr_reader :messages @@ -117,10 +120,11 @@ def to_gcloud_messages message_ids ## # @private Call the publish API with arrays of data and attrs. - def publish_batch_messages topic_name, service + def publish_batch_messages topic_name, service, reason: "unknown" grpc = service.publish topic_name, messages, compress: compress && total_message_bytes >= compression_bytes_threshold + log_batch "publish-batch", reason, "publish", messages.count, @total_message_bytes to_gcloud_messages Array(grpc.message_ids) end end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb new file mode 100644 index 000000000000..c8698a2eed77 --- /dev/null +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb @@ -0,0 +1,60 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +require "logger" + +module Google + module Cloud + module PubSub + def self.logger name = "" + unless name.nil? || name.empty? + name = "pubsub:#{name}" + end + @loggers ||= {} + @loggers[name] ||= begin + logger = Logger.new $stdout + logger.progname = name + logger + end + end + + ## + # @private + module LoggerHelper + private + + def log_batch logger_name, reason, type, num_messages, total_bytes + Google::Cloud::PubSub.logger(logger_name).info( + "#{reason} triggered #{type} batch of #{num_messages} messages, a total of #{total_bytes} bytes" + ) + end + + def log_slow_ack subscriber, removed_items, type + return if subscriber.histograms.nil? || subscriber.histograms[type].nil? + time_now = Time.now + histogram = subscriber.histograms[type] + removed_items.each do |ack_id, item| + duration_s = time_now - item.pulled_at + percentile_s = histogram.percentile 99 + histogram.add duration_s + next unless duration_s > percentile_s + Google::Cloud::PubSub.logger("slow-ack").info( + "message (ID #{item.message_id}, ackID #{ack_id}) #{type} took longer than the 99th percentile of " \ + "message processing time (#{type} duration: #{duration_s} s, 99th percentile: #{percentile_s} s)" + ) + end + end + end + end + end +end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb index 814ed067ee7d..d041670837c5 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb @@ -14,6 +14,7 @@ require "google/cloud/pubsub/service" +require "google/cloud/pubsub/subscriber" require "google/cloud/pubsub/message_listener/stream" require "google/cloud/pubsub/message_listener/timed_unary_buffer" require "monitor" @@ -71,6 +72,7 @@ class MessageListener attr_reader :message_ordering attr_reader :callback_threads attr_reader :push_threads + attr_reader :histograms ## # @private Implementation attributes. @@ -83,7 +85,7 @@ class MessageListener ## # @private Create an empty {MessageListener} object. def initialize subscription_name, callback, deadline: nil, message_ordering: nil, streams: nil, inventory: nil, - threads: {}, service: nil + threads: {}, service: nil, histograms: nil super() # to init MonitorMixin @callback = callback @@ -96,6 +98,7 @@ def initialize subscription_name, callback, deadline: nil, message_ordering: nil @callback_threads = Integer(threads[:callback] || 8) @push_threads = Integer(threads[:push] || 4) @exactly_once_delivery_enabled = nil + @histograms = histograms @service = service diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb index ca2333d76c1c..388105fea2b2 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +require "google/cloud/pubsub/logger_helper" require "monitor" module Google @@ -22,9 +22,9 @@ class MessageListener ## # @private class Inventory - InventoryItem = Struct.new :bytesize, :pulled_at do + InventoryItem = Struct.new :message_id, :bytesize, :pulled_at do def self.from rec_msg - new rec_msg.to_proto.bytesize, Time.now + new rec_msg.message.message_id, rec_msg.to_proto.bytesize, Time.now end end @@ -70,18 +70,28 @@ def add *rec_msgs def remove *ack_ids ack_ids.flatten! ack_ids.compact! - return if ack_ids.empty? + return {} if ack_ids.empty? + removed_items = {} synchronize do - @inventory.delete_if { |ack_id, _| ack_ids.include? ack_id } + removed, keep = @inventory.partition { |ack_id, _| ack_ids.include? ack_id } + @inventory = keep.to_h + removed_items = removed.to_h @wait_cond.broadcast end + removed_items end def remove_expired! synchronize do extension_time = Time.new - extension - @inventory.delete_if { |_ack_id, item| item.pulled_at < extension_time } + expired, keep = @inventory.partition { |_ack_id, item| item.pulled_at < extension_time } + @inventory = keep.to_h + expired.each do |ack_id, item| + Google::Cloud::PubSub.logger("expiry").info( + "message (ID #{item.message_id}, ackID #{ack_id}) has been dropped from leasing due to a timeout" + ) + end @wait_cond.broadcast end end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb index f7e0277d0f7f..4d7c17e3cc71 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +require "google/cloud/pubsub/logger_helper" require "google/cloud/pubsub/message_listener/sequencer" require "google/cloud/pubsub/message_listener/enumerator_queue" require "google/cloud/pubsub/message_listener/inventory" @@ -29,6 +29,7 @@ class MessageListener # @private class Stream include MonitorMixin + include LoggerHelper ## # @private Implementation attributes. @@ -73,7 +74,12 @@ def initialize subscriber execution_interval: 30 ) do # push empty request every 30 seconds to keep stream alive - push Google::Cloud::PubSub::V1::StreamingPullRequest.new unless inventory.empty? + unless inventory.empty? + Google::Cloud::PubSub.logger("subscriber-streams").info( + "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" + ) + push Google::Cloud::PubSub::V1::StreamingPullRequest.new + end end.execute end @@ -93,6 +99,9 @@ def stop synchronize do break if @stopped + Google::Cloud::PubSub.logger("subscriber-streams").info( + "stopping stream for subscription #{@subscriber.subscription_name}" + ) # Close the stream by pushing the sentinel value. # The unary pusher does not use the stream, so it can close here. @request_queue&.push self @@ -138,11 +147,14 @@ def acknowledge *messages, &callback ack_ids = coerce_ack_ids messages return true if ack_ids.empty? + removed_items = {} synchronize do - @inventory.remove ack_ids + removed_items = @inventory.remove ack_ids @subscriber.buffer.acknowledge ack_ids, callback end + log_slow_ack @subscriber, removed_items, "ack" + true end @@ -152,11 +164,14 @@ def modify_ack_deadline deadline, *messages, &callback mod_ack_ids = coerce_ack_ids messages return true if mod_ack_ids.empty? + removed_items = {} synchronize do - @inventory.remove mod_ack_ids + removed_items = @inventory.remove mod_ack_ids @subscriber.buffer.modify_ack_deadline deadline, mod_ack_ids, callback end + log_slow_ack @subscriber, removed_items, "nack" if deadline.zero? + true end @@ -215,7 +230,13 @@ class RestartStream < StandardError; end def background_run synchronize do # Don't allow a stream to restart if already stopped - return if @stopped + if @stopped + Google::Cloud::PubSub.logger("subscriber-streams").debug( + "not filling stream for subscription #{@subscriber.subscription_name} because stream is already" \ + " stopped" + ) + return + end @stopped = false @paused = false @@ -233,6 +254,9 @@ def background_run # Call the StreamingPull API to get the response enumerator options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } } enum = @subscriber.service.streaming_pull @request_queue.each, options + Google::Cloud::PubSub.logger("subscriber-streams").info( + "rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened" + ) loop do synchronize do @@ -287,13 +311,23 @@ def background_run stop rescue GRPC::Cancelled, GRPC::DeadlineExceeded, GRPC::Internal, GRPC::ResourceExhausted, GRPC::Unauthenticated, - GRPC::Unavailable + GRPC::Unavailable => e + status_code = e.respond_to?(:code) ? e.code : e.class.name + Google::Cloud::PubSub.logger("subscriber-streams").error( + "Subscriber stream for subscription #{@subscriber.subscription_name} has ended with status " \ + "#{status_code}; will be retried." + ) # Restart the stream with an incremental back for a retriable error. - retry rescue RestartStream + Google::Cloud::PubSub.logger("subscriber-streams").info( + "Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried." + ) retry rescue StandardError => e + Google::Cloud::PubSub.logger("subscriber-streams").error( + "error on stream for subscription #{@subscriber.subscription_name}: #{e.inspect}" + ) @subscriber.error! e retry @@ -336,13 +370,22 @@ def perform_callback_async rec_msg return unless callback_thread_pool.running? Concurrent::Promises.future_on( - callback_thread_pool, rec_msg, &method(:perform_callback_sync) + callback_thread_pool, + rec_msg, + &method(:perform_callback_sync) ) end def perform_callback_sync rec_msg + Google::Cloud::PubSub.logger("callback-delivery").info( + "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) delivery to user callbacks" + ) @subscriber.callback.call rec_msg unless stopped? rescue StandardError => e + Google::Cloud::PubSub.logger("callback-exceptions").info( + "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) caused a user callback exception: " \ + "#{e.inspect}" + ) @subscriber.error! e ensure release rec_msg @@ -369,6 +412,9 @@ def pause_streaming! return unless pause_streaming? @paused = true + Google::Cloud::PubSub.logger("subscriber-flow-control").info( + "subscriber for #{@subscriber.subscription_name} is client-side flow control blocked" + ) end def pause_streaming? @@ -382,6 +428,9 @@ def unpause_streaming! return unless unpause_streaming? @paused = nil + Google::Cloud::PubSub.logger("subscriber-flow-control").info( + "subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control" + ) # signal to the background thread that we are unpaused @pause_cond.broadcast end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb index 525bc4746f4f..58b4ab468cb0 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb @@ -18,6 +18,7 @@ require "google/cloud/pubsub/acknowledge_result" require "monitor" require "retriable" +require "google/cloud/pubsub/logger_helper" module Google module Cloud @@ -27,6 +28,7 @@ class MessageListener # @private class TimedUnaryBuffer include MonitorMixin + include Google::Cloud::PubSub::LoggerHelper attr_reader :max_bytes attr_reader :interval @@ -64,7 +66,7 @@ def initialize subscriber, max_bytes: 500_000, interval: 1.0 @retry_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads @callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads @task = Concurrent::TimerTask.new execution_interval: interval do - flush! + flush! reason: "interval timeout" end end @@ -108,9 +110,9 @@ def renew_lease deadline, ack_ids true end - def flush! + def flush! reason: "manual flush" # Grab requests from the buffer and release synchronize ASAP - requests = flush_requests! + requests = flush_requests! reason return if requests.empty? # Perform the RCP calls concurrently @@ -167,7 +169,7 @@ def stop @task.shutdown @retry_thread_pool.shutdown @callback_thread_pool.shutdown - flush! + flush! reason: "shutdown" self end @@ -296,7 +298,9 @@ def retry_request ack_ids, modack end end - def flush_requests! + # rubocop:disable Metrics/AbcSize + + def flush_requests! reason prev_reg = synchronize do return {} if @register.empty? @@ -308,16 +312,34 @@ def flush_requests! groups = prev_reg.each_pair.group_by { |_ack_id, delay| delay } req_hash = groups.transform_values { |v| v.map(&:first) } - requests = { acknowledge: [] } + requests = { acknowledge: [], modify_ack_deadline: [] } ack_ids = Array(req_hash.delete(:ack)) # ack has no deadline set - requests[:acknowledge] = create_acknowledge_requests ack_ids if ack_ids.any? + if ack_ids.any? + requests[:acknowledge] = create_acknowledge_requests ack_ids + new_reason = if requests[:acknowledge].length > 1 + "#{reason} and partitioned for exceeding max bytes" + else + reason + end + requests[:acknowledge].each do |req| + log_batch "ack-batch", new_reason, "ack", req.ack_ids.length, req.to_proto.bytesize + end + end requests[:modify_ack_deadline] = req_hash.map do |mod_deadline, mod_ack_ids| - create_modify_ack_deadline_requests mod_deadline, mod_ack_ids + mod_ack_reqs = create_modify_ack_deadline_requests mod_deadline, mod_ack_ids + type = mod_deadline.zero? ? "nack" : "modack" + new_reason = mod_ack_reqs.length > 1 ? "#{reason} and partitioned for exceeding max bytes" : reason + mod_ack_reqs.each do |req| + log_batch "ack-batch", new_reason, type, req.ack_ids.length, req.to_proto.bytesize + end + mod_ack_reqs end.flatten requests end + # rubocop:enable Metrics/AbcSize + def create_acknowledge_requests ack_ids req = Google::Cloud::PubSub::V1::AcknowledgeRequest.new( subscription: subscription_name, diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/publisher.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/publisher.rb index 43cb7a4b2fc9..0134324ab32f 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/publisher.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/publisher.rb @@ -223,7 +223,8 @@ def publish data = nil, attributes = nil, ordering_key: nil, compress: nil, comp block&.call batch return nil if batch.messages.count.zero? - batch.publish_batch_messages name, service + reason = block_given? ? "synchronous publish multiple" : "synchronous publish single" + batch.publish_batch_messages name, service, reason: reason end ## diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb index 344205c0bab3..5586d8c6d55c 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb @@ -19,6 +19,7 @@ require "google/cloud/pubsub/version" require "google/cloud/pubsub/v1" require "google/cloud/pubsub/admin_clients" +require "google/cloud/pubsub/logger_helper" require "securerandom" module Google @@ -141,6 +142,9 @@ def streaming_pull request_enum, options = {} ## # Acknowledges receipt of a message. def acknowledge subscription, *ack_ids + ack_ids.each do |ack_id| + Google::Cloud::PubSub.logger("ack-nack").info "message (ackID #{ack_id}) ack" + end subscription_admin.acknowledge_internal subscription: subscription_path(subscription), ack_ids: ack_ids end @@ -148,6 +152,11 @@ def acknowledge subscription, *ack_ids ## # Modifies the ack deadline for a specific message. def modify_ack_deadline subscription, ids, deadline + if deadline.zero? + Array(ids).each do |ack_id| + Google::Cloud::PubSub.logger("ack-nack").info "message (ackID #{ack_id}) nack" + end + end subscription_admin.modify_ack_deadline_internal subscription: subscription_path(subscription), ack_ids: Array(ids), ack_deadline_seconds: deadline diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb index 3abb7156f423..5c76af4e142c 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb @@ -54,6 +54,72 @@ module PubSub # listener.start # sleep class Subscriber + # @private + # A simple, non-pre-bucketed histogram for tracking numeric values + # and calculating percentiles. This is used internally to track message + # processing latencies for features like slow-ack logging. + class Histogram + # @private + # Creates a new Histogram. + # + # @param min_seconds [Numeric] The minimum value in seconds to record. + # Values below this will be clamped to this minimum. Defaults to 0. + # @param max_seconds [Numeric] The maximum value in seconds to record. + # Values above this will be clamped to this maximum. Defaults to + # `Float::INFINITY`. + # @param multiplier [Numeric] A multiplier to convert the seconds + # value to the desired internal unit. Defaults to 1,000,000,000.0 + # for nanoseconds. + def initialize min_seconds: 0, max_seconds: Float::INFINITY, multiplier: 1_000_000_000.0 + @min = min_seconds * multiplier + @max = max_seconds * multiplier + @multiplier = multiplier + @data = {} # Using a Hash as a Map + @length = 0 + @mutex = Mutex.new + end + + # Adds a value to the histogram. + # + # @param value_seconds [Numeric] The value to record, in seconds. + def add value_seconds + value = value_seconds * @multiplier + @mutex.synchronize do + value = value.ceil + value = [@min, value].max + value = [@max, value].min + + @data[value] ||= 0 + @data[value] += 1 + @length += 1 + end + end + + # Calculates a percentile from the values in the histogram. + # + # @param percent [Numeric] The percentile to calculate, from 0 to 100. + # + # @return [Numeric] The value at the given percentile, in seconds. + def percentile percent + @mutex.synchronize do + return @min / @multiplier if @length.zero? + + percent = [percent, 100].min + target = @length - (@length * (percent / 100.0)) + + keys = @data.keys.sort + + keys.reverse_each do |key| + target -= @data[key] + return key / @multiplier if target <= 0 + end + + @min / @multiplier + end + end + end + private_constant :Histogram + ## # @private The Service object. attr_accessor :service @@ -62,6 +128,10 @@ class Subscriber # @private The gRPC Google::Cloud::PubSub::V1::Subscription object. attr_accessor :grpc + ## + # @private The histograms for this subscriber. + attr_reader :histograms + ## # @private Create an empty {Subscriber} object. def initialize @@ -69,6 +139,7 @@ def initialize @grpc = nil @resource_name = nil @exists = nil + @histograms = { "ack" => Histogram.new, "nack" => Histogram.new } end ## @@ -414,7 +485,8 @@ def listen deadline: nil, message_ordering: nil, streams: nil, inventory: nil, t message_ordering = message_ordering? if message_ordering.nil? MessageListener.new name, block, deadline: deadline, streams: streams, inventory: inventory, - message_ordering: message_ordering, threads: threads, service: service + message_ordering: message_ordering, threads: threads, service: service, + histograms: histograms end ## diff --git a/google-cloud-pubsub/samples/acceptance/topics_test.rb b/google-cloud-pubsub/samples/acceptance/topics_test.rb index 16224581c0c9..eae4ec7d7dd4 100644 --- a/google-cloud-pubsub/samples/acceptance/topics_test.rb +++ b/google-cloud-pubsub/samples/acceptance/topics_test.rb @@ -406,9 +406,10 @@ topic: @topic.name # pubsub_publish - assert_output "Message published asynchronously.\n" do + out, _err = capture_io do publish_message_async topic_id: topic_id end + assert_includes out, "Message published asynchronously." messages = [] expect_with_retry "pubsub_publish" do @@ -505,9 +506,10 @@ topic: @topic.name # pubsub_publisher_concurrency_control - assert_output "Message published asynchronously.\n" do + out, _err = capture_io do publish_messages_async_with_concurrency_control topic_id: topic_id end + assert_includes out, "Message published asynchronously." messages = [] expect_with_retry "pubsub_publisher_concurrency_control" do From 965f6861a2c9b270f532656fdaf95c6044846aa3 Mon Sep 17 00:00:00 2001 From: Charlotte Yun Date: Tue, 11 Nov 2025 19:17:36 -0800 Subject: [PATCH 02/10] chore(Pubsub): remove Histogram implementation --- .../lib/google/cloud/pubsub/logger_helper.rb | 17 +---- .../google/cloud/pubsub/message_listener.rb | 4 +- .../cloud/pubsub/message_listener/stream.rb | 4 - .../lib/google/cloud/pubsub/subscriber.rb | 74 +------------------ 4 files changed, 3 insertions(+), 96 deletions(-) diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb index c8698a2eed77..78dec3c24c86 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb @@ -38,22 +38,7 @@ def log_batch logger_name, reason, type, num_messages, total_bytes "#{reason} triggered #{type} batch of #{num_messages} messages, a total of #{total_bytes} bytes" ) end - - def log_slow_ack subscriber, removed_items, type - return if subscriber.histograms.nil? || subscriber.histograms[type].nil? - time_now = Time.now - histogram = subscriber.histograms[type] - removed_items.each do |ack_id, item| - duration_s = time_now - item.pulled_at - percentile_s = histogram.percentile 99 - histogram.add duration_s - next unless duration_s > percentile_s - Google::Cloud::PubSub.logger("slow-ack").info( - "message (ID #{item.message_id}, ackID #{ack_id}) #{type} took longer than the 99th percentile of " \ - "message processing time (#{type} duration: #{duration_s} s, 99th percentile: #{percentile_s} s)" - ) - end - end + end end end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb index d041670837c5..9669d5e621a5 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb @@ -72,7 +72,6 @@ class MessageListener attr_reader :message_ordering attr_reader :callback_threads attr_reader :push_threads - attr_reader :histograms ## # @private Implementation attributes. @@ -85,7 +84,7 @@ class MessageListener ## # @private Create an empty {MessageListener} object. def initialize subscription_name, callback, deadline: nil, message_ordering: nil, streams: nil, inventory: nil, - threads: {}, service: nil, histograms: nil + threads: {}, service: nil super() # to init MonitorMixin @callback = callback @@ -98,7 +97,6 @@ def initialize subscription_name, callback, deadline: nil, message_ordering: nil @callback_threads = Integer(threads[:callback] || 8) @push_threads = Integer(threads[:push] || 4) @exactly_once_delivery_enabled = nil - @histograms = histograms @service = service diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb index 4d7c17e3cc71..c07e82b6ab9f 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb @@ -153,8 +153,6 @@ def acknowledge *messages, &callback @subscriber.buffer.acknowledge ack_ids, callback end - log_slow_ack @subscriber, removed_items, "ack" - true end @@ -170,8 +168,6 @@ def modify_ack_deadline deadline, *messages, &callback @subscriber.buffer.modify_ack_deadline deadline, mod_ack_ids, callback end - log_slow_ack @subscriber, removed_items, "nack" if deadline.zero? - true end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb index 5c76af4e142c..3abb7156f423 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb @@ -54,72 +54,6 @@ module PubSub # listener.start # sleep class Subscriber - # @private - # A simple, non-pre-bucketed histogram for tracking numeric values - # and calculating percentiles. This is used internally to track message - # processing latencies for features like slow-ack logging. - class Histogram - # @private - # Creates a new Histogram. - # - # @param min_seconds [Numeric] The minimum value in seconds to record. - # Values below this will be clamped to this minimum. Defaults to 0. - # @param max_seconds [Numeric] The maximum value in seconds to record. - # Values above this will be clamped to this maximum. Defaults to - # `Float::INFINITY`. - # @param multiplier [Numeric] A multiplier to convert the seconds - # value to the desired internal unit. Defaults to 1,000,000,000.0 - # for nanoseconds. - def initialize min_seconds: 0, max_seconds: Float::INFINITY, multiplier: 1_000_000_000.0 - @min = min_seconds * multiplier - @max = max_seconds * multiplier - @multiplier = multiplier - @data = {} # Using a Hash as a Map - @length = 0 - @mutex = Mutex.new - end - - # Adds a value to the histogram. - # - # @param value_seconds [Numeric] The value to record, in seconds. - def add value_seconds - value = value_seconds * @multiplier - @mutex.synchronize do - value = value.ceil - value = [@min, value].max - value = [@max, value].min - - @data[value] ||= 0 - @data[value] += 1 - @length += 1 - end - end - - # Calculates a percentile from the values in the histogram. - # - # @param percent [Numeric] The percentile to calculate, from 0 to 100. - # - # @return [Numeric] The value at the given percentile, in seconds. - def percentile percent - @mutex.synchronize do - return @min / @multiplier if @length.zero? - - percent = [percent, 100].min - target = @length - (@length * (percent / 100.0)) - - keys = @data.keys.sort - - keys.reverse_each do |key| - target -= @data[key] - return key / @multiplier if target <= 0 - end - - @min / @multiplier - end - end - end - private_constant :Histogram - ## # @private The Service object. attr_accessor :service @@ -128,10 +62,6 @@ def percentile percent # @private The gRPC Google::Cloud::PubSub::V1::Subscription object. attr_accessor :grpc - ## - # @private The histograms for this subscriber. - attr_reader :histograms - ## # @private Create an empty {Subscriber} object. def initialize @@ -139,7 +69,6 @@ def initialize @grpc = nil @resource_name = nil @exists = nil - @histograms = { "ack" => Histogram.new, "nack" => Histogram.new } end ## @@ -485,8 +414,7 @@ def listen deadline: nil, message_ordering: nil, streams: nil, inventory: nil, t message_ordering = message_ordering? if message_ordering.nil? MessageListener.new name, block, deadline: deadline, streams: streams, inventory: inventory, - message_ordering: message_ordering, threads: threads, service: service, - histograms: histograms + message_ordering: message_ordering, threads: threads, service: service end ## From a1336ecd08c41bbf9ca2ecbdce176b9eb2692327 Mon Sep 17 00:00:00 2001 From: Charlotte Yun Date: Wed, 12 Nov 2025 15:36:38 -0800 Subject: [PATCH 03/10] chore(Pubsub): Add GOOGLE_SDK_RUBY_LOGGING enviornmental variable to turn logs on/off --- .../lib/google/cloud/pubsub/logger_helper.rb | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb index 78dec3c24c86..ce5b4c7ebc04 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb @@ -17,17 +17,25 @@ module Google module Cloud module PubSub def self.logger name = "" - unless name.nil? || name.empty? - name = "pubsub:#{name}" - end @loggers ||= {} @loggers[name] ||= begin - logger = Logger.new $stdout - logger.progname = name - logger + if is_logging_enabled + logger = Logger.new $stdout + prog_name = "pubsub" + prog_name = "#{prog_name}:#{name}" unless name.nil? || name.empty? + logger.progname = prog_name + logger + else + Logger.new nil + end end end + def self.is_logging_enabled + packages = ENV["GOOGLE_SDK_RUBY_LOGGING"]&.split(",") || [] + packages.include?("pubsub") || packages.include?("all") + end + ## # @private module LoggerHelper From dd580ccc918ceef2da933024fa2ccfb72a93a422 Mon Sep 17 00:00:00 2001 From: Charlotte Yun Date: Wed, 12 Nov 2025 15:37:01 -0800 Subject: [PATCH 04/10] chore(PubSub): Make logging safe in service.rb --- .../lib/google/cloud/pubsub/logger_helper.rb | 12 +++++++++++- .../lib/google/cloud/pubsub/service.rb | 10 ++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb index ce5b4c7ebc04..ff584cfabff5 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb @@ -46,7 +46,17 @@ def log_batch logger_name, reason, type, num_messages, total_bytes "#{reason} triggered #{type} batch of #{num_messages} messages, a total of #{total_bytes} bytes" ) end - + + def log_ack_nack_safely ack_ids, type + return unless Google::Cloud::PubSub.is_logging_enabled + begin + ack_ids.each do |ack_id| + Google::Cloud::PubSub.logger("ack-nack").info "message (ackID #{ack_id}) #{type}" + end + rescue StandardError + # Ignore all logging errors. + end + end end end end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb index 5586d8c6d55c..8f0769f022a1 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb @@ -28,6 +28,8 @@ module PubSub ## # @private Represents the Pub/Sub service API, including IAM mixins. class Service + include LoggerHelper + attr_accessor :project attr_accessor :credentials attr_accessor :host @@ -142,9 +144,7 @@ def streaming_pull request_enum, options = {} ## # Acknowledges receipt of a message. def acknowledge subscription, *ack_ids - ack_ids.each do |ack_id| - Google::Cloud::PubSub.logger("ack-nack").info "message (ackID #{ack_id}) ack" - end + log_ack_nack_safely ack_ids, "ack" subscription_admin.acknowledge_internal subscription: subscription_path(subscription), ack_ids: ack_ids end @@ -153,9 +153,7 @@ def acknowledge subscription, *ack_ids # Modifies the ack deadline for a specific message. def modify_ack_deadline subscription, ids, deadline if deadline.zero? - Array(ids).each do |ack_id| - Google::Cloud::PubSub.logger("ack-nack").info "message (ackID #{ack_id}) nack" - end + log_ack_nack_safely Array(ids), "nack" end subscription_admin.modify_ack_deadline_internal subscription: subscription_path(subscription), ack_ids: Array(ids), From 9f7642a10b791e8726ab3c14f1ee6719cc2b720b Mon Sep 17 00:00:00 2001 From: Charlotte Yun Date: Fri, 14 Nov 2025 18:13:49 -0800 Subject: [PATCH 05/10] chore(PubSub): allowing user to configure logger using Google::Cloud.configure --- google-cloud-pubsub/lib/google-cloud-pubsub.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google-cloud-pubsub/lib/google-cloud-pubsub.rb b/google-cloud-pubsub/lib/google-cloud-pubsub.rb index da6899323b9a..1d9f1b2e3cd4 100644 --- a/google-cloud-pubsub/lib/google-cloud-pubsub.rb +++ b/google-cloud-pubsub/lib/google-cloud-pubsub.rb @@ -23,6 +23,7 @@ require "google/cloud" unless defined? Google::Cloud.new require "google/cloud/config" require "googleauth" +require "logger" module Google module Cloud @@ -142,6 +143,8 @@ def self.pubsub project_id = nil, "https://www.googleapis.com/auth/pubsub" ] + default_logger = Logger.new $stdout + config.add_field! :project_id, default_project, match: String, allow_nil: true config.add_alias! :project, :project_id config.add_field! :credentials, default_creds, match: [String, Hash, Google::Auth::Credentials], allow_nil: true @@ -153,4 +156,5 @@ def self.pubsub project_id = nil, config.add_field! :on_error, nil, match: Proc config.add_field! :endpoint, nil, match: String config.add_field! :universe_domain, nil, match: String + config.add_field! :logger, default_logger, match: Logger, allow_nil: true end From ac04a73eaa76a1babddd4648181c7e65e78873a3 Mon Sep 17 00:00:00 2001 From: Charlotte Yun Date: Fri, 14 Nov 2025 18:14:48 -0800 Subject: [PATCH 06/10] chore(PubSub): making all logs block-based and catching and killing all exceptions --- .../lib/google/cloud/pubsub/logger_helper.rb | 70 +++++++++++++------ .../pubsub/message_listener/inventory.rb | 7 +- .../cloud/pubsub/message_listener/stream.rb | 46 ++++++------ .../lib/google/cloud/pubsub/service.rb | 4 +- 4 files changed, 74 insertions(+), 53 deletions(-) diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb index ff584cfabff5..594b59107201 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb @@ -16,45 +16,69 @@ module Google module Cloud module PubSub - def self.logger name = "" - @loggers ||= {} - @loggers[name] ||= begin - if is_logging_enabled - logger = Logger.new $stdout - prog_name = "pubsub" - prog_name = "#{prog_name}:#{name}" unless name.nil? || name.empty? - logger.progname = prog_name - logger - else - Logger.new nil - end - end + LOG_NAME = "pubsub".freeze + private_constant :LOG_NAME + + def self.logger + is_logging_enabled ? configure.logger : Logger.new(nil) end def self.is_logging_enabled - packages = ENV["GOOGLE_SDK_RUBY_LOGGING"]&.split(",") || [] - packages.include?("pubsub") || packages.include?("all") + begin + env_var = ENV["GOOGLE_SDK_RUBY_LOGGING_GEMS"] + return false if (configure.logger.nil? || env_var == "none") + return true if env_var == "all" + # parse env var by removing whitespace and splitting by comma + packages = env_var&.gsub(/\s+/, "")&.split(",") || [] + packages.include?(LOG_NAME) + rescue StandardError => e + warn "Failed to determine logging configuration. Logging disabled. Error: #{e.class}: #{e.message}" + false + end end ## # @private module LoggerHelper + VALID_LOG_LEVELS = [:debug, :info, :warn, :error, :fatal].freeze + private_constant :VALID_LOG_LEVELS + private + # rubocop:disable Naming/BlockForwarding + def log_safely level, subtag, &message_block + return unless VALID_LOG_LEVELS.include?(level) && block_given? + begin + Google::Cloud::PubSub.logger.public_send(level, "#{LOG_NAME}:#{subtag}", &message_block) + rescue StandardError + # Ignore all logging errors. + end + end + # rubocop:enable Naming/BlockForwarding + def log_batch logger_name, reason, type, num_messages, total_bytes - Google::Cloud::PubSub.logger(logger_name).info( + log_safely :info, logger_name do "#{reason} triggered #{type} batch of #{num_messages} messages, a total of #{total_bytes} bytes" - ) + end end - def log_ack_nack_safely ack_ids, type + def log_ack_nack ack_ids, type + # exit early to avoid unnecessary loop return unless Google::Cloud::PubSub.is_logging_enabled - begin - ack_ids.each do |ack_id| - Google::Cloud::PubSub.logger("ack-nack").info "message (ackID #{ack_id}) #{type}" + ack_ids.each do |ack_id| + log_safely :info, "ack-nack" do + "message (ackID #{ack_id}) #{type}" + end + end + end + + def log_expiry expired + # exit early to avoid unnecessary loop + return unless Google::Cloud::PubSub.is_logging_enabled + expired.each do |ack_id, item| + log_safely :info, "expiry" do + "message (ID #{item.message_id}, ackID #{ack_id}) has been dropped from leasing due to a timeout" end - rescue StandardError - # Ignore all logging errors. end end end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb index 388105fea2b2..2a3e61eb1e59 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb @@ -29,6 +29,7 @@ def self.from rec_msg end include MonitorMixin + include LoggerHelper attr_reader :stream attr_reader :limit @@ -87,11 +88,7 @@ def remove_expired! extension_time = Time.new - extension expired, keep = @inventory.partition { |_ack_id, item| item.pulled_at < extension_time } @inventory = keep.to_h - expired.each do |ack_id, item| - Google::Cloud::PubSub.logger("expiry").info( - "message (ID #{item.message_id}, ackID #{ack_id}) has been dropped from leasing due to a timeout" - ) - end + log_expiry expired @wait_cond.broadcast end end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb index c07e82b6ab9f..bfd4a0432b4d 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb @@ -75,9 +75,9 @@ def initialize subscriber ) do # push empty request every 30 seconds to keep stream alive unless inventory.empty? - Google::Cloud::PubSub.logger("subscriber-streams").info( + log_safely :info, "subscriber-streams" do "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" - ) + end push Google::Cloud::PubSub::V1::StreamingPullRequest.new end end.execute @@ -99,9 +99,9 @@ def stop synchronize do break if @stopped - Google::Cloud::PubSub.logger("subscriber-streams").info( + log_safely :info, "subscriber-streams" do "stopping stream for subscription #{@subscriber.subscription_name}" - ) + end # Close the stream by pushing the sentinel value. # The unary pusher does not use the stream, so it can close here. @request_queue&.push self @@ -227,10 +227,10 @@ def background_run synchronize do # Don't allow a stream to restart if already stopped if @stopped - Google::Cloud::PubSub.logger("subscriber-streams").debug( + log_safely :debug, "subscriber-streams" do "not filling stream for subscription #{@subscriber.subscription_name} because stream is already" \ " stopped" - ) + end return end @@ -250,9 +250,9 @@ def background_run # Call the StreamingPull API to get the response enumerator options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } } enum = @subscriber.service.streaming_pull @request_queue.each, options - Google::Cloud::PubSub.logger("subscriber-streams").info( + log_safely :info, "subscriber-streams" do "rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened" - ) + end loop do synchronize do @@ -309,21 +309,21 @@ def background_run GRPC::ResourceExhausted, GRPC::Unauthenticated, GRPC::Unavailable => e status_code = e.respond_to?(:code) ? e.code : e.class.name - Google::Cloud::PubSub.logger("subscriber-streams").error( + log_safely :error, "subscriber-streams" do "Subscriber stream for subscription #{@subscriber.subscription_name} has ended with status " \ "#{status_code}; will be retried." - ) + end # Restart the stream with an incremental back for a retriable error. retry rescue RestartStream - Google::Cloud::PubSub.logger("subscriber-streams").info( + log_safely :info, "subscriber-streams" do "Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried." - ) + end retry rescue StandardError => e - Google::Cloud::PubSub.logger("subscriber-streams").error( + log_safely :error, "subscriber-streams" do "error on stream for subscription #{@subscriber.subscription_name}: #{e.inspect}" - ) + end @subscriber.error! e retry @@ -373,15 +373,15 @@ def perform_callback_async rec_msg end def perform_callback_sync rec_msg - Google::Cloud::PubSub.logger("callback-delivery").info( + log_safely :info, "callback-delivery" do "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) delivery to user callbacks" - ) + end @subscriber.callback.call rec_msg unless stopped? rescue StandardError => e - Google::Cloud::PubSub.logger("callback-exceptions").info( + log_safely :info, "callback-exceptions" do "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) caused a user callback exception: " \ - "#{e.inspect}" - ) + "#{e.inspect}" + end @subscriber.error! e ensure release rec_msg @@ -408,9 +408,9 @@ def pause_streaming! return unless pause_streaming? @paused = true - Google::Cloud::PubSub.logger("subscriber-flow-control").info( + log_safely :info, "subscriber-flow-control" do "subscriber for #{@subscriber.subscription_name} is client-side flow control blocked" - ) + end end def pause_streaming? @@ -424,9 +424,9 @@ def unpause_streaming! return unless unpause_streaming? @paused = nil - Google::Cloud::PubSub.logger("subscriber-flow-control").info( + log_safely :info, "subscriber-flow-control" do "subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control" - ) + end # signal to the background thread that we are unpaused @pause_cond.broadcast end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb index 8f0769f022a1..a992f27a14f0 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb @@ -144,7 +144,7 @@ def streaming_pull request_enum, options = {} ## # Acknowledges receipt of a message. def acknowledge subscription, *ack_ids - log_ack_nack_safely ack_ids, "ack" + log_ack_nack ack_ids, "ack" subscription_admin.acknowledge_internal subscription: subscription_path(subscription), ack_ids: ack_ids end @@ -153,7 +153,7 @@ def acknowledge subscription, *ack_ids # Modifies the ack deadline for a specific message. def modify_ack_deadline subscription, ids, deadline if deadline.zero? - log_ack_nack_safely Array(ids), "nack" + log_ack_nack Array(ids), "nack" end subscription_admin.modify_ack_deadline_internal subscription: subscription_path(subscription), ack_ids: Array(ids), From 9b62384f4539942ab2705bb71c7f25a89a467191 Mon Sep 17 00:00:00 2001 From: Charlotte Yun Date: Fri, 14 Nov 2025 18:27:55 -0800 Subject: [PATCH 07/10] chore(PubSub): add adhoc debug logging info to readme --- google-cloud-pubsub/README.md | 38 +++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/google-cloud-pubsub/README.md b/google-cloud-pubsub/README.md index 080d33a1cf61..310cc4bca0f8 100644 --- a/google-cloud-pubsub/README.md +++ b/google-cloud-pubsub/README.md @@ -92,6 +92,44 @@ module GRPC end ``` +### Enabling library level logging + +This library includes an opt-in logging mechanism that provides detailed information about high-level operations. These logs are useful for troubleshooting and monitoring the client's behavior. When enabled, logs are tagged with subtags to indicate the operation type. + +The following subtags are used: + +* `callback-delivery`: Logs when a message is delivered to the user-provided callback. +* `callback-exceptions`: Logs any exceptions raised from the user callback. +* `ack-nack`: Logs when a message is acknowledged (`ack`) or negatively acknowledged (`nack`). +* `ack-batch`: Logs the reason and size of acknowledgement batches sent to the server. +* `publish-batch`: Logs the reason and size of message batches sent to the server for publishing. +* `expiry`: Logs when a message's lease expires and it is dropped from client-side lease management. +* `subscriber-streams`: Logs key events in the subscriber's streaming connection, such as opening, closing, and errors. +* `subscriber-flow-control`: Logs when the subscriber's client-side flow control is paused or resumed. + +**WARNING:** These logs may contain message data in plaintext, which could include sensitive information. Ensure you are practicing good data hygiene with your application logs. It is recommended to enable this logging only for debugging purposes and not permanently in production. + +To enable logging across all of Google Cloud Ruby SDK Gems, set the `GOOGLE_SDK_RUBY_LOGGING_GEMS` environment variable to `all`. +To enable logging for just the google-cloud-pubsub gem, set the `GOOGLE_SDK_RUBY_LOGGING_GEMS` environment variable to a comma separated string that contains `pubsub` +To disable logging across all of Google Cloud Ruby SDK Gems, set the `GOOGLE_SDK_RUBY_LOGGING_GEMS` to `none` + +```sh +export GOOGLE_SDK_RUBY_LOGGING_GEMS=pubsub +``` + +You can programmatically configure a custom logger: + +```ruby +require "google/cloud/pubsub" +require "logger" + +# Configure a logger for the pubsub library +Google::Cloud.configure.pubsub.logger = Logger.new "my-app.log" +``` + +If the custom logger is not configured, it will default to a standard stdout logger. + + ## Supported Ruby Versions This library is supported on Ruby 3.1+. From 6b0754ccce0390c90b8dde89e756269bf45542df Mon Sep 17 00:00:00 2001 From: Charlotte Yun Date: Fri, 14 Nov 2025 18:40:52 -0800 Subject: [PATCH 08/10] chore(PubSub): adding docs to fix ci failure --- .../lib/google/cloud/pubsub/logger_helper.rb | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb index 594b59107201..92c908245dd2 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb @@ -19,10 +19,16 @@ module PubSub LOG_NAME = "pubsub".freeze private_constant :LOG_NAME + ## + # The logger for the Pub/Sub library. + # @return [Logger] def self.logger is_logging_enabled ? configure.logger : Logger.new(nil) end + ## + # Checks if logging is enabled for the Pub/Sub library. + # @return [Boolean] def self.is_logging_enabled begin env_var = ENV["GOOGLE_SDK_RUBY_LOGGING_GEMS"] From 77e21884be22a324f0c4c93d6c10e5dc200023fc Mon Sep 17 00:00:00 2001 From: Yoshi Automation Bot Date: Tue, 2 Dec 2025 19:37:32 -0800 Subject: [PATCH 09/10] chore(PubSub): Implement logging with GoogleSdkLoggerDelegator --- google-cloud-pubsub-v1/.owlbot.rb | 46 ++++++++++++ google-cloud-pubsub/Gemfile | 1 + google-cloud-pubsub/README.md | 19 ++++- .../lib/google/cloud/pubsub.rb | 16 ++++- .../google/cloud/pubsub/async_publisher.rb | 4 +- .../google/cloud/pubsub/batch_publisher.rb | 4 +- .../pubsub/{logger_helper.rb => logging.rb} | 72 ++++++++----------- .../pubsub/message_listener/inventory.rb | 4 +- .../cloud/pubsub/message_listener/stream.rb | 24 +++---- .../message_listener/timed_unary_buffer.rb | 6 +- .../lib/google/cloud/pubsub/service.rb | 15 ++-- google-cloud-pubsub/samples/Gemfile | 1 + .../pubsub/message_listener/inventory_test.rb | 11 ++- .../test/google/cloud/pubsub/service_test.rb | 20 +++++- .../test/google/cloud/pubsub_test.rb | 36 ++++++---- google-cloud-pubsub/test/helper.rb | 7 +- 16 files changed, 189 insertions(+), 97 deletions(-) create mode 100644 google-cloud-pubsub-v1/.owlbot.rb rename google-cloud-pubsub/lib/google/cloud/pubsub/{logger_helper.rb => logging.rb} (50%) diff --git a/google-cloud-pubsub-v1/.owlbot.rb b/google-cloud-pubsub-v1/.owlbot.rb new file mode 100644 index 000000000000..bf05438745b3 --- /dev/null +++ b/google-cloud-pubsub-v1/.owlbot.rb @@ -0,0 +1,46 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +OwlBot.modifier path: "lib/google/cloud/pubsub/v1/schema_service/client.rb" do |content| + content.gsub! "# @return [::Logger,:default,nil]", + "# @return [::Logger,::Google::Logging::GoogleSdkLoggerDelegator,:default,nil]" +end +OwlBot.modifier path: "lib/google/cloud/pubsub/v1/subscription_admin/client.rb" do |content| + content.gsub! "# @return [::Logger,:default,nil]", + "# @return [::Logger,::Google::Logging::GoogleSdkLoggerDelegator,:default,nil]" +end +OwlBot.modifier path: "lib/google/cloud/pubsub/v1/topic_admin/client.rb" do |content| + content.gsub! "# @return [::Logger,:default,nil]", + "# @return [::Logger,::Google::Logging::GoogleSdkLoggerDelegator,:default,nil]" +end +OwlBot.modifier path: "lib/google/cloud/pubsub/v1/schema_service/client.rb" do |content| + content.gsub! "config_attr :logger, :default, ::Logger, nil, :default", + "config_attr :logger, :default, [::Logger, ::Google::Logging::GoogleSdkLoggerDelegator], nil, :default" +end +OwlBot.modifier path: "lib/google/cloud/pubsub/v1/subscription_admin/client.rb" do |content| + content.gsub! "config_attr :logger, :default, ::Logger, nil, :default", + "config_attr :logger, :default, [::Logger, ::Google::Logging::GoogleSdkLoggerDelegator], nil, :default" +end +OwlBot.modifier path: "lib/google/cloud/pubsub/v1/topic_admin/client.rb" do |content| + content.gsub! "config_attr :logger, :default, ::Logger, nil, :default", + "config_attr :logger, :default, [::Logger, ::Google::Logging::GoogleSdkLoggerDelegator], nil, :default" +end + +# Add google-logging-utils as a dependency +OwlBot.modifier path: "google-cloud-pubsub-v1.gemspec" do |content| + content.gsub!(/ +end\z/, "\n gem.add_dependency \"google-logging-utils\", \"~> 0.3.0\"\nend") +end + +OwlBot.move_files diff --git a/google-cloud-pubsub/Gemfile b/google-cloud-pubsub/Gemfile index f3bbfc6b53f7..6c96ff69ee64 100644 --- a/google-cloud-pubsub/Gemfile +++ b/google-cloud-pubsub/Gemfile @@ -35,3 +35,4 @@ gem "retriable", "~> 3.1.2" gem "simplecov", "~> 0.22" gem "yard", "~> 0.9.37" gem "yard-doctest", "~> 0.1.17" +gem "google-logging-utils", "~> 0.3.0" diff --git a/google-cloud-pubsub/README.md b/google-cloud-pubsub/README.md index 310cc4bca0f8..fc1b3d7e32ab 100644 --- a/google-cloud-pubsub/README.md +++ b/google-cloud-pubsub/README.md @@ -117,17 +117,30 @@ To disable logging across all of Google Cloud Ruby SDK Gems, set the `GOOGLE_SDK export GOOGLE_SDK_RUBY_LOGGING_GEMS=pubsub ``` -You can programmatically configure a custom logger: +You can programmatically configure a custom logger. The logger can be set globally for the Pub/Sub library, or provided on a per-client basis. + +To set a logger globally, configure it on the `Google::Cloud` configuration object: ```ruby require "google/cloud/pubsub" require "logger" -# Configure a logger for the pubsub library +# Configure a global logger for the pubsub library Google::Cloud.configure.pubsub.logger = Logger.new "my-app.log" ``` -If the custom logger is not configured, it will default to a standard stdout logger. +Alternatively, you can provide a logger directly to the `PubSub` client initializer. If a logger instance is provided, it will override any globally configured logger. + +```ruby +require "google/cloud/pubsub" +require "logger" + +# Provide a logger directly to the client +custom_logger = Logger.new "pubsub-client.log" +pubsub = Google::Cloud::PubSub.new logger: custom_logger +``` + +If no custom logger is configured, a default logger that writes to standard output will be used. ## Supported Ruby Versions diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub.rb b/google-cloud-pubsub/lib/google/cloud/pubsub.rb index 0963999262a7..1c4eead30166 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub.rb @@ -14,6 +14,7 @@ require "google-cloud-pubsub" +require "google/cloud/pubsub/logging" require "google/cloud/pubsub/project" require "google/cloud/config" require "google/cloud/env" @@ -70,11 +71,16 @@ module PubSub # # * `https://www.googleapis.com/auth/pubsub` # @param [Numeric] timeout Default timeout to use in requests. Optional. + # @param [String] universe_domain A custom universe domain. Optional. # @param [String] endpoint Override of the endpoint host name. Optional. # If the param is nil, uses the default endpoint. # @param [String] emulator_host Pub/Sub emulator host. Optional. # If the param is nil, uses the value of the `emulator_host` config. - # @param universe_domain [String] A custom universe domain. Optional. + # @param [Logger] logger Optional Logger instance for emitting + # library-level debug logs. If not provided, it will default to + # configure.logger, which defaults to Logger.new STDOUT if not set. To + # enable logging, set environment variable GOOGLE_SDK_RUBY_LOGGING_GEMS + # to "all" or a comma separated list of gem names, including "pubsub". # # @return [Google::Cloud::PubSub::Project] # @@ -92,13 +98,15 @@ def self.new project_id: nil, timeout: nil, universe_domain: nil, endpoint: nil, - emulator_host: nil + emulator_host: nil, + logger: nil project_id ||= default_project_id scope ||= configure.scope timeout ||= configure.timeout endpoint ||= configure.endpoint universe_domain ||= configure.universe_domain emulator_host ||= configure.emulator_host + logger ||= configure.logger if emulator_host credentials = :this_channel_is_insecure @@ -114,10 +122,12 @@ def self.new project_id: nil, project_id = project_id.to_s # Always cast to a string raise ArgumentError, "project_id is missing" if project_id.empty? + logging = Google::Cloud::PubSub::Logging.create logger service = PubSub::Service.new project_id, credentials, host: endpoint, timeout: timeout, - universe_domain: universe_domain + universe_domain: universe_domain, + logging: logging PubSub::Project.new service end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb index aec834e510b0..161103b546bc 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb @@ -21,7 +21,6 @@ require "google/cloud/pubsub/publish_result" require "google/cloud/pubsub/service" require "google/cloud/pubsub/convert" -require "google/cloud/pubsub/logger_helper" module Google module Cloud @@ -60,7 +59,6 @@ module PubSub # class AsyncPublisher include MonitorMixin - include Google::Cloud::PubSub::LoggerHelper attr_reader :topic_name attr_reader :max_bytes @@ -381,7 +379,7 @@ def publish_batch_sync topic_name, batch, reason: "unknown" grpc = @service.publish topic_name, items.map(&:msg), compress: compress && batch.total_message_bytes >= compression_bytes_threshold - log_batch "publish-batch", reason, "publish", items.count, items.sum(&:bytesize) + service.logging.log_batch "publish-batch", reason, "publish", items.count, items.sum(&:bytesize) items.zip Array(grpc.message_ids) do |item, id| @flow_controller.release item.bytesize next unless item.callback diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb index 1d718d3c58f8..94cd0505cd1c 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb @@ -14,7 +14,6 @@ require "google/cloud/pubsub/convert" -require "google/cloud/pubsub/logger_helper" module Google module Cloud @@ -36,7 +35,6 @@ module PubSub # end # class BatchPublisher - include Google::Cloud::PubSub::LoggerHelper ## # @private The messages to publish @@ -124,7 +122,7 @@ def publish_batch_messages topic_name, service, reason: "unknown" grpc = service.publish topic_name, messages, compress: compress && total_message_bytes >= compression_bytes_threshold - log_batch "publish-batch", reason, "publish", messages.count, @total_message_bytes + service.logging.log_batch "publish-batch", reason, "publish", messages.count, @total_message_bytes to_gcloud_messages Array(grpc.message_ids) end end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/logging.rb similarity index 50% rename from google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb rename to google-cloud-pubsub/lib/google/cloud/pubsub/logging.rb index 92c908245dd2..9cf3299a6242 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/logger_helper.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/logging.rb @@ -12,81 +12,67 @@ # See the License for the specific language governing permissions and # limitations under the License. require "logger" +require "google/logging/google_sdk_logger_delegator" +require "google/cloud/config" module Google module Cloud module PubSub - LOG_NAME = "pubsub".freeze - private_constant :LOG_NAME - - ## - # The logger for the Pub/Sub library. - # @return [Logger] - def self.logger - is_logging_enabled ? configure.logger : Logger.new(nil) - end - - ## - # Checks if logging is enabled for the Pub/Sub library. - # @return [Boolean] - def self.is_logging_enabled - begin - env_var = ENV["GOOGLE_SDK_RUBY_LOGGING_GEMS"] - return false if (configure.logger.nil? || env_var == "none") - return true if env_var == "all" - # parse env var by removing whitespace and splitting by comma - packages = env_var&.gsub(/\s+/, "")&.split(",") || [] - packages.include?(LOG_NAME) - rescue StandardError => e - warn "Failed to determine logging configuration. Logging disabled. Error: #{e.class}: #{e.message}" - false - end - end - ## # @private - module LoggerHelper + class Logging + LOG_NAME = "pubsub".freeze VALID_LOG_LEVELS = [:debug, :info, :warn, :error, :fatal].freeze - private_constant :VALID_LOG_LEVELS + private_constant :VALID_LOG_LEVELS, :LOG_NAME - private + ## + # @private + def self.create logger + new logger + end + ## + # @private # rubocop:disable Naming/BlockForwarding - def log_safely level, subtag, &message_block + def log level, subtag, &message_block return unless VALID_LOG_LEVELS.include?(level) && block_given? - begin - Google::Cloud::PubSub.logger.public_send(level, "#{LOG_NAME}:#{subtag}", &message_block) - rescue StandardError - # Ignore all logging errors. - end + @logging_delegator.public_send(level, "#{LOG_NAME}:#{subtag}", &message_block) end # rubocop:enable Naming/BlockForwarding + ## + # @private def log_batch logger_name, reason, type, num_messages, total_bytes - log_safely :info, logger_name do + log :info, logger_name do "#{reason} triggered #{type} batch of #{num_messages} messages, a total of #{total_bytes} bytes" end end + ## + # @private def log_ack_nack ack_ids, type - # exit early to avoid unnecessary loop - return unless Google::Cloud::PubSub.is_logging_enabled ack_ids.each do |ack_id| - log_safely :info, "ack-nack" do + log :info, "ack-nack" do "message (ackID #{ack_id}) #{type}" end end end + ## + # @private def log_expiry expired - # exit early to avoid unnecessary loop - return unless Google::Cloud::PubSub.is_logging_enabled expired.each do |ack_id, item| - log_safely :info, "expiry" do + log :info, "expiry" do "message (ID #{item.message_id}, ackID #{ack_id}) has been dropped from leasing due to a timeout" end end end + + private + + def initialize logger + @logging_delegator = Google::Logging::GoogleSdkLoggerDelegator.new LOG_NAME, logger + end end end end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb index 2a3e61eb1e59..950f528d8a66 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -require "google/cloud/pubsub/logger_helper" require "monitor" module Google @@ -29,7 +28,6 @@ def self.from rec_msg end include MonitorMixin - include LoggerHelper attr_reader :stream attr_reader :limit @@ -88,7 +86,7 @@ def remove_expired! extension_time = Time.new - extension expired, keep = @inventory.partition { |_ack_id, item| item.pulled_at < extension_time } @inventory = keep.to_h - log_expiry expired + stream.subscriber.service.logging.log_expiry expired @wait_cond.broadcast end end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb index bfd4a0432b4d..846b61b5949b 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -require "google/cloud/pubsub/logger_helper" require "google/cloud/pubsub/message_listener/sequencer" require "google/cloud/pubsub/message_listener/enumerator_queue" require "google/cloud/pubsub/message_listener/inventory" @@ -29,7 +28,6 @@ class MessageListener # @private class Stream include MonitorMixin - include LoggerHelper ## # @private Implementation attributes. @@ -75,7 +73,7 @@ def initialize subscriber ) do # push empty request every 30 seconds to keep stream alive unless inventory.empty? - log_safely :info, "subscriber-streams" do + subscriber.service.logging.log :info, "subscriber-streams" do "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" end push Google::Cloud::PubSub::V1::StreamingPullRequest.new @@ -99,7 +97,7 @@ def stop synchronize do break if @stopped - log_safely :info, "subscriber-streams" do + subscriber.service.logging.log :info, "subscriber-streams" do "stopping stream for subscription #{@subscriber.subscription_name}" end # Close the stream by pushing the sentinel value. @@ -227,7 +225,7 @@ def background_run synchronize do # Don't allow a stream to restart if already stopped if @stopped - log_safely :debug, "subscriber-streams" do + subscriber.service.logging.log :debug, "subscriber-streams" do "not filling stream for subscription #{@subscriber.subscription_name} because stream is already" \ " stopped" end @@ -250,7 +248,7 @@ def background_run # Call the StreamingPull API to get the response enumerator options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } } enum = @subscriber.service.streaming_pull @request_queue.each, options - log_safely :info, "subscriber-streams" do + subscriber.service.logging.log :info, "subscriber-streams" do "rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened" end @@ -309,19 +307,19 @@ def background_run GRPC::ResourceExhausted, GRPC::Unauthenticated, GRPC::Unavailable => e status_code = e.respond_to?(:code) ? e.code : e.class.name - log_safely :error, "subscriber-streams" do + subscriber.service.logging.log :error, "subscriber-streams" do "Subscriber stream for subscription #{@subscriber.subscription_name} has ended with status " \ "#{status_code}; will be retried." end # Restart the stream with an incremental back for a retriable error. retry rescue RestartStream - log_safely :info, "subscriber-streams" do + subscriber.service.logging.log :info, "subscriber-streams" do "Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried." end retry rescue StandardError => e - log_safely :error, "subscriber-streams" do + subscriber.service.logging.log :error, "subscriber-streams" do "error on stream for subscription #{@subscriber.subscription_name}: #{e.inspect}" end @subscriber.error! e @@ -373,12 +371,12 @@ def perform_callback_async rec_msg end def perform_callback_sync rec_msg - log_safely :info, "callback-delivery" do + subscriber.service.logging.log :info, "callback-delivery" do "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) delivery to user callbacks" end @subscriber.callback.call rec_msg unless stopped? rescue StandardError => e - log_safely :info, "callback-exceptions" do + subscriber.service.logging.log :info, "callback-exceptions" do "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) caused a user callback exception: " \ "#{e.inspect}" end @@ -408,7 +406,7 @@ def pause_streaming! return unless pause_streaming? @paused = true - log_safely :info, "subscriber-flow-control" do + subscriber.service.logging.log :info, "subscriber-flow-control" do "subscriber for #{@subscriber.subscription_name} is client-side flow control blocked" end end @@ -424,7 +422,7 @@ def unpause_streaming! return unless unpause_streaming? @paused = nil - log_safely :info, "subscriber-flow-control" do + subscriber.service.logging.log :info, "subscriber-flow-control" do "subscriber for #{@subscriber.subscription_name} is unblocking client-side flow control" end # signal to the background thread that we are unpaused diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb index 58b4ab468cb0..b2644cff5490 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/timed_unary_buffer.rb @@ -18,7 +18,6 @@ require "google/cloud/pubsub/acknowledge_result" require "monitor" require "retriable" -require "google/cloud/pubsub/logger_helper" module Google module Cloud @@ -28,7 +27,6 @@ class MessageListener # @private class TimedUnaryBuffer include MonitorMixin - include Google::Cloud::PubSub::LoggerHelper attr_reader :max_bytes attr_reader :interval @@ -322,7 +320,7 @@ def flush_requests! reason reason end requests[:acknowledge].each do |req| - log_batch "ack-batch", new_reason, "ack", req.ack_ids.length, req.to_proto.bytesize + @subscriber.service.logging.log_batch "ack-batch", new_reason, "ack", req.ack_ids.length, req.to_proto.bytesize end end requests[:modify_ack_deadline] = @@ -331,7 +329,7 @@ def flush_requests! reason type = mod_deadline.zero? ? "nack" : "modack" new_reason = mod_ack_reqs.length > 1 ? "#{reason} and partitioned for exceeding max bytes" : reason mod_ack_reqs.each do |req| - log_batch "ack-batch", new_reason, type, req.ack_ids.length, req.to_proto.bytesize + @subscriber.service.logging.log_batch "ack-batch", new_reason, type, req.ack_ids.length, req.to_proto.bytesize end mod_ack_reqs end.flatten diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb index a992f27a14f0..06faf3bfe30b 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/service.rb @@ -19,7 +19,7 @@ require "google/cloud/pubsub/version" require "google/cloud/pubsub/v1" require "google/cloud/pubsub/admin_clients" -require "google/cloud/pubsub/logger_helper" +require "google/cloud/pubsub/logging" require "securerandom" module Google @@ -28,7 +28,6 @@ module PubSub ## # @private Represents the Pub/Sub service API, including IAM mixins. class Service - include LoggerHelper attr_accessor :project attr_accessor :credentials @@ -43,15 +42,20 @@ class Service attr_reader :universe_domain + ## + # @private The Logging object. + attr_reader :logging + ## # Creates a new Service instance. - def initialize project, credentials, host: nil, timeout: nil, universe_domain: nil + def initialize project, credentials, host: nil, timeout: nil, universe_domain: nil, logging: nil @project = project @credentials = credentials @host = host @timeout = timeout @client_id = SecureRandom.uuid.freeze @universe_domain = universe_domain || ENV["GOOGLE_CLOUD_UNIVERSE_DOMAIN"] || "googleapis.com" + @logging = logging end def subscription_admin @@ -144,7 +148,7 @@ def streaming_pull request_enum, options = {} ## # Acknowledges receipt of a message. def acknowledge subscription, *ack_ids - log_ack_nack ack_ids, "ack" + logging.log_ack_nack ack_ids, "ack" subscription_admin.acknowledge_internal subscription: subscription_path(subscription), ack_ids: ack_ids end @@ -153,7 +157,7 @@ def acknowledge subscription, *ack_ids # Modifies the ack deadline for a specific message. def modify_ack_deadline subscription, ids, deadline if deadline.zero? - log_ack_nack Array(ids), "nack" + logging.log_ack_nack Array(ids), "nack" end subscription_admin.modify_ack_deadline_internal subscription: subscription_path(subscription), ack_ids: Array(ids), @@ -200,6 +204,7 @@ def override_client_config_timeouts config rpc.timeout = timeout if rpc.respond_to? :timeout= end end + end end Pubsub = PubSub unless const_defined? :Pubsub diff --git a/google-cloud-pubsub/samples/Gemfile b/google-cloud-pubsub/samples/Gemfile index c537774d0a21..86b171a01b45 100644 --- a/google-cloud-pubsub/samples/Gemfile +++ b/google-cloud-pubsub/samples/Gemfile @@ -37,4 +37,5 @@ group :test do gem "rack-test" gem "rake" gem "toys-core" + gem "google-logging-utils", "~>0.3.0" end diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb index fec618a2e312..6383daf662fa 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb @@ -217,8 +217,13 @@ end it "removes expired items" do - subscriber_mock = Minitest::Mock.new - inventory = Google::Cloud::PubSub::MessageListener::Inventory.new subscriber_mock, + logging_mock = Minitest::Mock.new + logging_mock.expect :log_expiry, nil, [Array] + service_mock = OpenStruct.new logging: logging_mock + listener_mock = OpenStruct.new service: service_mock + stream_mock = OpenStruct.new subscriber: listener_mock + + inventory = Google::Cloud::PubSub::MessageListener::Inventory.new stream_mock, limit: 1000, bytesize: 100_000, extension: 3600, @@ -237,6 +242,8 @@ inventory.remove_expired! _(inventory.ack_ids).must_equal ["ack-id-1112", "ack-id-1113"] + + logging_mock.verify end it "knows its max_duration_per_lease_extension limit" do diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/service_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/service_test.rb index 8f065f4e1a8d..90bc34fbb254 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/service_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/service_test.rb @@ -15,6 +15,7 @@ require "helper" require "gapic/grpc/service_stub" require "google/cloud/pubsub/v1" +require "logger" describe Google::Cloud::PubSub::Service do class PubSubServiceTestDummyStub @@ -35,6 +36,20 @@ def logger end end + class MockLogging + def log_ack_nack ack_ids, type + # Do nothing + end + + def log_batch logger_name, reason, type, num_messages, total_bytes + # Do nothing + end + + def log_expiry expired + # Do nothing + end + end + let(:project) { "test" } let(:credentials) { :this_channel_is_insecure } let(:timeout) { 123.4 } @@ -42,6 +57,7 @@ def logger let(:endpoint_2) { "localhost:4567" } let(:universe_domain) { "googleapis.com" } let(:universe_domain_2) { "mydomain.com" } + let(:mock_logging) { MockLogging.new } # Values below are hardcoded in Service. let(:lib_name) { "gccl" } @@ -203,13 +219,13 @@ def logger end it "should raise errors other than grpc on ack" do - service = Google::Cloud::PubSub::Service.new project, nil + service = Google::Cloud::PubSub::Service.new project, nil, logging: mock_logging mocked_subscription_admin = Minitest::Mock.new service.mocked_subscription_admin = mocked_subscription_admin def mocked_subscription_admin.acknowledge_internal *args raise RuntimeError.new "test" end - assert_raises RuntimeError do + assert_raises RuntimeError do service.acknowledge "sub","ack_id" end end diff --git a/google-cloud-pubsub/test/google/cloud/pubsub_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub_test.rb index aa5009c316b3..546d36dc37db 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub_test.rb @@ -97,12 +97,13 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -152,12 +153,13 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -184,12 +186,13 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -231,12 +234,13 @@ def creds.is_a? target it "allows timeout to be set" do timeout = 123.4 - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal default_credentials _(timeout).must_equal timeout _(host).must_be :nil? _(universe_domain).must_be :nil? + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging } # Clear all environment variables @@ -255,12 +259,13 @@ def creds.is_a? target it "allows endpoint to be set" do endpoint = "localhost:4567" - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal default_credentials _(timeout).must_be :nil? _(host).must_equal endpoint _(universe_domain).must_be :nil? + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project, credentials: credentials } @@ -283,12 +288,13 @@ def creds.is_a? target it "allows universe_domain to be set" do actual_universe_domain = "myuniverse.com" - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, logging: nil) { _(project).must_equal "project-id" _(credentials).must_equal default_credentials _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_equal actual_universe_domain + _(logging).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project, credentials: credentials } @@ -331,13 +337,14 @@ def creds.is_a? target _(scope).must_equal default_scopes OpenStruct.new project_id: "project-id" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(project).must_equal "project-id" _(credentials).must_be_kind_of OpenStruct _(credentials.project_id).must_equal "project-id" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } empty_env = OpenStruct.new @@ -375,12 +382,13 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -413,12 +421,13 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_be :nil? _(host).must_be :nil? _(universe_domain).must_be :nil? + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -451,10 +460,11 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_equal 42.0 + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -488,10 +498,11 @@ def creds.is_a? target _(scope).must_equal default_scopes "pubsub-credentials" } - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(project).must_equal "project-id" _(credentials).must_equal "pubsub-credentials" _(timeout).must_equal 42.0 + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } @@ -523,9 +534,10 @@ def creds.is_a? target actual_endpoint = "myendpoint.com" actual_universe_domain = "mydomain.com" - stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil) { + stubbed_service = ->(project, credentials, timeout: nil, host: nil, universe_domain: nil, **kwargs) { _(host).must_equal actual_endpoint _(universe_domain).must_equal actual_universe_domain + _(kwargs[:logging]).must_be_kind_of Google::Cloud::PubSub::Logging OpenStruct.new project: project } diff --git a/google-cloud-pubsub/test/helper.rb b/google-cloud-pubsub/test/helper.rb index e5b42464bb68..3d19a6cd58ed 100644 --- a/google-cloud-pubsub/test/helper.rb +++ b/google-cloud-pubsub/test/helper.rb @@ -107,7 +107,12 @@ def message_hash class MockPubsub < Minitest::Spec let(:project) { "test" } let(:credentials) { OpenStruct.new(client: OpenStruct.new(updater_proc: Proc.new {})) } - let(:pubsub) { Google::Cloud::PubSub::Project.new(Google::Cloud::PubSub::Service.new(project, credentials)) } + let(:pubsub) do + logger = Google::Cloud.configure.pubsub.logger || Logger.new(STDOUT) + logging = Google::Cloud::PubSub::Logging.create logger + service = Google::Cloud::PubSub::Service.new project, credentials, logging: logging + Google::Cloud::PubSub::Project.new service + end def topics_hash num_topics, token = "" topics = num_topics.times.map do From 38f281b1ef5a89336406f8d9e2dbac92a2fbb32c Mon Sep 17 00:00:00 2001 From: Yoshi Automation Bot Date: Wed, 17 Dec 2025 16:35:57 -0800 Subject: [PATCH 10/10] chore(PubSub): owlbot.rb changes upon testing --- google-cloud-pubsub-v1/.owlbot.rb | 27 +++------------------------ 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/google-cloud-pubsub-v1/.owlbot.rb b/google-cloud-pubsub-v1/.owlbot.rb index bf05438745b3..b613edf662c4 100644 --- a/google-cloud-pubsub-v1/.owlbot.rb +++ b/google-cloud-pubsub-v1/.owlbot.rb @@ -13,34 +13,13 @@ # limitations under the License. OwlBot.modifier path: "lib/google/cloud/pubsub/v1/schema_service/client.rb" do |content| - content.gsub! "# @return [::Logger,:default,nil]", - "# @return [::Logger,::Google::Logging::GoogleSdkLoggerDelegator,:default,nil]" + content.gsub! "::Logger", "::Logger, ::Google::Logging::GoogleSdkLoggerDelegator" end OwlBot.modifier path: "lib/google/cloud/pubsub/v1/subscription_admin/client.rb" do |content| - content.gsub! "# @return [::Logger,:default,nil]", - "# @return [::Logger,::Google::Logging::GoogleSdkLoggerDelegator,:default,nil]" + content.gsub! "::Logger", "::Logger, ::Google::Logging::GoogleSdkLoggerDelegator" end OwlBot.modifier path: "lib/google/cloud/pubsub/v1/topic_admin/client.rb" do |content| - content.gsub! "# @return [::Logger,:default,nil]", - "# @return [::Logger,::Google::Logging::GoogleSdkLoggerDelegator,:default,nil]" -end -OwlBot.modifier path: "lib/google/cloud/pubsub/v1/schema_service/client.rb" do |content| - content.gsub! "config_attr :logger, :default, ::Logger, nil, :default", - "config_attr :logger, :default, [::Logger, ::Google::Logging::GoogleSdkLoggerDelegator], nil, :default" -end -OwlBot.modifier path: "lib/google/cloud/pubsub/v1/subscription_admin/client.rb" do |content| - content.gsub! "config_attr :logger, :default, ::Logger, nil, :default", - "config_attr :logger, :default, [::Logger, ::Google::Logging::GoogleSdkLoggerDelegator], nil, :default" -end -OwlBot.modifier path: "lib/google/cloud/pubsub/v1/topic_admin/client.rb" do |content| - content.gsub! "config_attr :logger, :default, ::Logger, nil, :default", - "config_attr :logger, :default, [::Logger, ::Google::Logging::GoogleSdkLoggerDelegator], nil, :default" -end - -# Add google-logging-utils as a dependency -OwlBot.modifier path: "google-cloud-pubsub-v1.gemspec" do |content| - content.gsub!(/ -end\z/, "\n gem.add_dependency \"google-logging-utils\", \"~> 0.3.0\"\nend") + content.gsub! "::Logger", "::Logger, ::Google::Logging::GoogleSdkLoggerDelegator" end OwlBot.move_files