From 05b28bfca5ab7b3f0f8bc4d7dd4a04fe285140c1 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Mon, 25 Dec 2023 09:42:12 +0100 Subject: [PATCH 1/8] wip --- lib/mongo/monitoring.rb | 4 ++ lib/mongo/monitoring/open_telemetry_tracer.rb | 68 +++++++++++++++++++ lib/mongo/operation/shared/executable.rb | 9 ++- 3 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 lib/mongo/monitoring/open_telemetry_tracer.rb diff --git a/lib/mongo/monitoring.rb b/lib/mongo/monitoring.rb index 8c0be2c8fa..a5ff7a1da9 100644 --- a/lib/mongo/monitoring.rb +++ b/lib/mongo/monitoring.rb @@ -236,8 +236,11 @@ def initialize(options = {}) subscribe(TOPOLOGY_CHANGED, TopologyChangedLogSubscriber.new(options)) subscribe(TOPOLOGY_CLOSED, TopologyClosedLogSubscriber.new(options)) end + @tracer = options[:otel_tracer] || OpenTelemetryTracer.new end + attr_reader :tracer + # @api private attr_reader :options @@ -369,6 +372,7 @@ def initialize_copy(original) require 'mongo/monitoring/publishable' require 'mongo/monitoring/command_log_subscriber' require 'mongo/monitoring/cmap_log_subscriber' +require 'mongo/monitoring/open_telemetry_tracer' require 'mongo/monitoring/sdam_log_subscriber' require 'mongo/monitoring/server_description_changed_log_subscriber' require 'mongo/monitoring/server_closed_log_subscriber' diff --git a/lib/mongo/monitoring/open_telemetry_tracer.rb b/lib/mongo/monitoring/open_telemetry_tracer.rb new file mode 100644 index 0000000000..96f0fc881d --- /dev/null +++ b/lib/mongo/monitoring/open_telemetry_tracer.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true +# rubocop:todo all + +# Copyright (C) 2015-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class Monitoring + + # Subscribes to command events and traces them to OpenTelemetry. + # + # @api private + class OpenTelemetryTracer + + ENV_VARIABLE_ENABLED = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_DISABLED' + + OTEL_TRACER_NAME = 'mongo-ruby-driver' + + def initialize(options = {}) + if defined?(::OpenTelemetry) && ENV[ENV_VARIABLE_ENABLED] != 'true' + @tracer = (options[:opentelemetry_tracer_provider] || ::OpenTelemetry.tracer_provider).tracer( + OTEL_TRACER_NAME, Mongo::VERSION + ) + end + end + + def in_span(message, operation, address) + attributes = { + 'db.system' => 'mongodb', + 'db.name' => operation.spec[:db_name], + 'db.operation' => message.payload[:command_name], + 'net.peer.name' => address.host, + 'net.peer.port' => address.port + } + @tracer.in_span(span_name(message, operation), attributes: attributes) do |span| + yield(span) + end + end + + def enabled? + @tracer != nil + end + + private + + def span_name(message, operation) + collection = operation.spec[:coll_name] + command_name = message.payload[:command_name] + if collection + "#{collection}.#{command_name}" + else + command_name + end + end + end + end +end diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 229479e24b..0abfece12c 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -96,7 +96,14 @@ def get_result(connection, context, options = {}) def dispatch_message(connection, context, options = {}) message = build_message(connection, context) message = message.maybe_encrypt(connection, context) - reply = connection.dispatch([ message ], context, options) + if connection.server.cluster.monitoring&.tracer&.enabled? + reply = nil + connection.server.cluster.monitoring.tracer.in_span(message, self, connection.address) do |span| + reply = connection.dispatch([ message ], context, options) + end + else + reply = connection.dispatch([ message ], context, options) + end [reply, connection.description, connection.global_id] end From fd88568c3c7a4567ecce57c12c4efb2dd836cef2 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Fri, 29 Dec 2023 16:20:58 +0200 Subject: [PATCH 2/8] Add tests --- lib/mongo/monitoring.rb | 4 +- lib/mongo/monitoring/open_telemetry.rb | 28 +++++ .../open_telemetry/statement_builder.rb | 73 ++++++++++++ lib/mongo/monitoring/open_telemetry/tracer.rb | 95 ++++++++++++++++ lib/mongo/monitoring/open_telemetry_tracer.rb | 68 ------------ .../open_telemetry/statement_builder_spec.rb | 104 ++++++++++++++++++ 6 files changed, 302 insertions(+), 70 deletions(-) create mode 100644 lib/mongo/monitoring/open_telemetry.rb create mode 100644 lib/mongo/monitoring/open_telemetry/statement_builder.rb create mode 100644 lib/mongo/monitoring/open_telemetry/tracer.rb delete mode 100644 lib/mongo/monitoring/open_telemetry_tracer.rb create mode 100644 spec/mongo/monitoring/open_telemetry/statement_builder_spec.rb diff --git a/lib/mongo/monitoring.rb b/lib/mongo/monitoring.rb index a5ff7a1da9..f3df93a4fc 100644 --- a/lib/mongo/monitoring.rb +++ b/lib/mongo/monitoring.rb @@ -236,7 +236,7 @@ def initialize(options = {}) subscribe(TOPOLOGY_CHANGED, TopologyChangedLogSubscriber.new(options)) subscribe(TOPOLOGY_CLOSED, TopologyClosedLogSubscriber.new(options)) end - @tracer = options[:otel_tracer] || OpenTelemetryTracer.new + @tracer = options[:otel_tracer] || OpenTelemetry::Tracer.new end attr_reader :tracer @@ -372,7 +372,7 @@ def initialize_copy(original) require 'mongo/monitoring/publishable' require 'mongo/monitoring/command_log_subscriber' require 'mongo/monitoring/cmap_log_subscriber' -require 'mongo/monitoring/open_telemetry_tracer' +require 'mongo/monitoring/open_telemetry' require 'mongo/monitoring/sdam_log_subscriber' require 'mongo/monitoring/server_description_changed_log_subscriber' require 'mongo/monitoring/server_closed_log_subscriber' diff --git a/lib/mongo/monitoring/open_telemetry.rb b/lib/mongo/monitoring/open_telemetry.rb new file mode 100644 index 0000000000..e8ae23401e --- /dev/null +++ b/lib/mongo/monitoring/open_telemetry.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +# Copyright (C) 2015-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class Monitoring + # @api private + module OpenTelemetry + + end + end +end + + +require 'mongo/monitoring/open_telemetry/statement_builder' +require 'mongo/monitoring/open_telemetry/tracer' diff --git a/lib/mongo/monitoring/open_telemetry/statement_builder.rb b/lib/mongo/monitoring/open_telemetry/statement_builder.rb new file mode 100644 index 0000000000..7510b0a6ad --- /dev/null +++ b/lib/mongo/monitoring/open_telemetry/statement_builder.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +# Copyright (C) 2015-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class Monitoring + # Subscribes to command events and traces them to OpenTelemetry. + # + # @api private + module OpenTelemetry + # This class is used to build a +db.statement+ attribute for an OpenTelemetry span + # from a MongoDB command. + class StatementBuilder + # @param [ BSON::Document ] command The message that will be + # sent to the server. + # @param [ Boolean ] obfuscate Whether to obfuscate the statement. + def initialize(command, obfuscate) + @command = command + @command_name, @collection = command.first + @obfuscate = obfuscate + end + + # Builds the statement. + # + # @return [ String ] The statement as a JSON string. + def build + statement.to_json.freeze unless statement.empty? + end + + private + + def statement + mask(@command) + end + + def mask(hash) + hash.each_with_object({}) do |(k, v), h| + next if Mongo::Protocol::Msg::INTERNAL_KEYS.include?(k.to_s) + + value = case v + when Hash then mask(v) + when Array then v.map { |e| mask(e) } + else mask_value(k, v) + end + h[k] = value + end + end + + def mask_value(key, value) + if key == @command_name && value == @collection + @collection + elsif @obfuscate + '?' + else + value + end + end + end + end + end +end diff --git a/lib/mongo/monitoring/open_telemetry/tracer.rb b/lib/mongo/monitoring/open_telemetry/tracer.rb new file mode 100644 index 0000000000..263124e97e --- /dev/null +++ b/lib/mongo/monitoring/open_telemetry/tracer.rb @@ -0,0 +1,95 @@ +# frozen_string_literal: true + +# Copyright (C) 2015-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class Monitoring + module OpenTelemetry + class Tracer + ENV_VARIABLE_ENABLED = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_DISABLED' + + ENV_VARIABLE_DB_STATEMENT = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_DB_STATEMENT' + + DB_STATEMENT_DEFAULT_VALUE = 'obfuscate' + + DB_STATEMENT_VALUES = %i[omit obfuscate include] + + OTEL_TRACER_NAME = 'mongo-ruby-driver' + + def initialize(options = {}) + return unless defined?(::OpenTelemetry) && ENV[ENV_VARIABLE_ENABLED] != 'true' + + @tracer = (options[:opentelemetry_tracer_provider] || ::OpenTelemetry.tracer_provider).tracer( + OTEL_TRACER_NAME, Mongo::VERSION + ) + end + + def in_span(message, operation, address, &block) + @tracer.in_span( + span_name(message, operation), + attributes: attributes(message, operation, address), + kind: :client, + &block + ) + end + + def enabled? + @tracer != nil + end + + private + + def db_statement + @db_statement ||= ENV.fetch(ENV_VARIABLE_DB_STATEMENT, DB_STATEMENT_DEFAULT_VALUE).to_sym.tap do |statement| + unless DB_STATEMENT_VALUES.include?(statement) + raise ArgumentError, "Invalid value for #{ENV_VARIABLE_DB_STATEMENT}: #{statement}" + end + end + end + + def omit? + db_statement == :omit + end + + def obfuscate? + db_statement == :obfuscate + end + + def span_name(message, operation) + collection = operation.spec[:coll_name] + command_name = message.payload[:command_name] + if collection + "#{collection}.#{command_name}" + else + command_name + end + end + + def attributes(message, operation, address) + { + 'db.system' => 'mongodb', + 'db.name' => message.payload[:database_name], + 'db.operation' => message.payload[:command_name], + 'net.peer.name' => address.host, + 'net.peer.port' => address.port, + 'db.mongodb.collection' => operation.spec[:coll_name], + }.tap do |attributes| + attributes['db.statement'] = StatementBuilder.new(message, obfuscate?).build unless omit? + end + end + end + end + end +end diff --git a/lib/mongo/monitoring/open_telemetry_tracer.rb b/lib/mongo/monitoring/open_telemetry_tracer.rb deleted file mode 100644 index 96f0fc881d..0000000000 --- a/lib/mongo/monitoring/open_telemetry_tracer.rb +++ /dev/null @@ -1,68 +0,0 @@ -# frozen_string_literal: true -# rubocop:todo all - -# Copyright (C) 2015-present MongoDB Inc. -# -# Licensed under the Apache License, Version 2.0 (the 'License'); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -module Mongo - class Monitoring - - # Subscribes to command events and traces them to OpenTelemetry. - # - # @api private - class OpenTelemetryTracer - - ENV_VARIABLE_ENABLED = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_DISABLED' - - OTEL_TRACER_NAME = 'mongo-ruby-driver' - - def initialize(options = {}) - if defined?(::OpenTelemetry) && ENV[ENV_VARIABLE_ENABLED] != 'true' - @tracer = (options[:opentelemetry_tracer_provider] || ::OpenTelemetry.tracer_provider).tracer( - OTEL_TRACER_NAME, Mongo::VERSION - ) - end - end - - def in_span(message, operation, address) - attributes = { - 'db.system' => 'mongodb', - 'db.name' => operation.spec[:db_name], - 'db.operation' => message.payload[:command_name], - 'net.peer.name' => address.host, - 'net.peer.port' => address.port - } - @tracer.in_span(span_name(message, operation), attributes: attributes) do |span| - yield(span) - end - end - - def enabled? - @tracer != nil - end - - private - - def span_name(message, operation) - collection = operation.spec[:coll_name] - command_name = message.payload[:command_name] - if collection - "#{collection}.#{command_name}" - else - command_name - end - end - end - end -end diff --git a/spec/mongo/monitoring/open_telemetry/statement_builder_spec.rb b/spec/mongo/monitoring/open_telemetry/statement_builder_spec.rb new file mode 100644 index 0000000000..86a9b637f9 --- /dev/null +++ b/spec/mongo/monitoring/open_telemetry/statement_builder_spec.rb @@ -0,0 +1,104 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Mongo::Monitoring::OpenTelemetry::StatementBuilder do + shared_examples 'statement builder tests' do |obfuscate| + subject(:statement_builder) { described_class.new(command, obfuscate) } + + context 'with a find command' do + let(:command) { { 'find' => 'users', 'filter' => { 'name' => 'John' }, '$db' => 'test', 'lsid' => 'some_id' } } + + it 'ignores specified keys' do + statement = JSON.parse(statement_builder.build) + Mongo::Protocol::Msg::INTERNAL_KEYS.each do |key| + expect(statement).not_to have_key(key) + end + end + + it 'returns the correct statement' do + expected_filter = obfuscate ? { 'name' => '?' } : { 'name' => 'John' } + expected_statement = { 'find' => 'users', 'filter' => expected_filter }.to_json.freeze + expect(statement_builder.build).to eq(expected_statement) + end + end + + context 'with an aggregation pipeline' do + let(:pipeline) do + [ { '$match' => { 'status' => 'A' } }, + { '$group' => { '_id' => '$cust_id', 'total' => { '$sum' => '$amount' } } } ] + end + let(:command) { { 'aggregate' => 'orders', 'pipeline' => pipeline, '$db' => 'test' } } + + it 'returns the correct statement' do + expected_pipeline = if obfuscate + [ { '$match' => { 'status' => '?' } }, + { '$group' => { '_id' => '?', 'total' => { '$sum' => '?' } } } ] + else + pipeline + end + expected_statement = { 'aggregate' => 'orders', 'pipeline' => expected_pipeline }.to_json.freeze + expect(statement_builder.build).to eq(expected_statement) + end + end + + context 'with a findAndModify command' do + let(:command) do + { 'findAndModify' => 'users', 'query' => { 'name' => 'John' }, 'update' => { '$set' => { 'age' => 30 } }, + '$db' => 'test' } + end + + it 'returns the correct statement' do + expected_query = obfuscate ? { 'name' => '?' } : { 'name' => 'John' } + expected_update = obfuscate ? { '$set' => { 'age' => '?' } } : { '$set' => { 'age' => 30 } } + expected_statement = { 'findAndModify' => 'users', 'query' => expected_query, + 'update' => expected_update }.to_json.freeze + expect(statement_builder.build).to eq(expected_statement) + end + end + + context 'with an update command' do + let(:command) do + { 'update' => 'users', 'updates' => [ { 'q' => { 'name' => 'John' }, 'u' => { '$set' => { 'age' => 30 } } } ], + '$db' => 'test' } + end + + it 'returns the correct statement' do + expected_updates = if obfuscate + [ { 'q' => { 'name' => '?' }, + 'u' => { '$set' => { 'age' => '?' } } } ] + else + [ { 'q' => { 'name' => 'John' }, + 'u' => { '$set' => { 'age' => 30 } } } ] + end + expected_statement = { 'update' => 'users', 'updates' => expected_updates }.to_json.freeze + expect(statement_builder.build).to eq(expected_statement) + end + end + + context 'with a delete command' do + let(:command) do + { 'delete' => 'users', 'deletes' => [ { 'q' => { 'name' => 'John' }, 'limit' => 1 } ], '$db' => 'test' } + end + + it 'returns the correct statement' do + expected_deletes = if obfuscate + [ { 'q' => { 'name' => '?' }, + 'limit' => '?' } ] + else + [ { 'q' => { 'name' => 'John' }, 'limit' => 1 } ] + end + expected_statement = { 'delete' => 'users', 'deletes' => expected_deletes }.to_json.freeze + expect(statement_builder.build).to eq(expected_statement) + end + end + end + + context 'when obfuscation is false' do + include_examples 'statement builder tests', false + end + + context 'when obfuscation is true' do + include_examples 'statement builder tests', true + end +end From 2ff3d63753a4a885028778340b3cad9b25b38b8a Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Fri, 29 Dec 2023 17:58:01 +0200 Subject: [PATCH 3/8] Fix rubocop complaints --- lib/mongo/monitoring/open_telemetry.rb | 2 - .../open_telemetry/statement_builder.rb | 147 +++++++++--------- lib/mongo/monitoring/open_telemetry/tracer.rb | 4 +- .../open_telemetry/statement_builder_spec.rb | 30 ++-- 4 files changed, 95 insertions(+), 88 deletions(-) diff --git a/lib/mongo/monitoring/open_telemetry.rb b/lib/mongo/monitoring/open_telemetry.rb index e8ae23401e..4e2a847269 100644 --- a/lib/mongo/monitoring/open_telemetry.rb +++ b/lib/mongo/monitoring/open_telemetry.rb @@ -18,11 +18,9 @@ module Mongo class Monitoring # @api private module OpenTelemetry - end end end - require 'mongo/monitoring/open_telemetry/statement_builder' require 'mongo/monitoring/open_telemetry/tracer' diff --git a/lib/mongo/monitoring/open_telemetry/statement_builder.rb b/lib/mongo/monitoring/open_telemetry/statement_builder.rb index 7510b0a6ad..a9f6f15eb3 100644 --- a/lib/mongo/monitoring/open_telemetry/statement_builder.rb +++ b/lib/mongo/monitoring/open_telemetry/statement_builder.rb @@ -1,73 +1,74 @@ -# frozen_string_literal: true - -# Copyright (C) 2015-present MongoDB Inc. -# -# Licensed under the Apache License, Version 2.0 (the 'License'); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -module Mongo - class Monitoring - # Subscribes to command events and traces them to OpenTelemetry. - # - # @api private - module OpenTelemetry - # This class is used to build a +db.statement+ attribute for an OpenTelemetry span - # from a MongoDB command. - class StatementBuilder - # @param [ BSON::Document ] command The message that will be - # sent to the server. - # @param [ Boolean ] obfuscate Whether to obfuscate the statement. - def initialize(command, obfuscate) - @command = command - @command_name, @collection = command.first - @obfuscate = obfuscate - end - - # Builds the statement. - # - # @return [ String ] The statement as a JSON string. - def build - statement.to_json.freeze unless statement.empty? - end - - private - - def statement - mask(@command) - end - - def mask(hash) - hash.each_with_object({}) do |(k, v), h| - next if Mongo::Protocol::Msg::INTERNAL_KEYS.include?(k.to_s) - - value = case v - when Hash then mask(v) - when Array then v.map { |e| mask(e) } - else mask_value(k, v) - end - h[k] = value - end - end - - def mask_value(key, value) - if key == @command_name && value == @collection - @collection - elsif @obfuscate - '?' - else - value - end - end - end - end - end -end +# frozen_string_literal: true + +# +# Copyright (C) 2015-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class Monitoring + # This module contains classes related to OpenTelemetry instrumentation. + # + # @api private + module OpenTelemetry + # This class is used to build a +db.statement+ attribute for an OpenTelemetry span + # from a MongoDB command. + class StatementBuilder + # @param [ BSON::Document ] command The message that will be + # sent to the server. + # @param [ Boolean ] obfuscate Whether to obfuscate the statement. + def initialize(command, obfuscate) + @command = command + @command_name, @collection = command.first + @obfuscate = obfuscate + end + + # Builds the statement. + # + # @return [ String ] The statement as a JSON string. + def build + statement.to_json.freeze unless statement.empty? + end + + private + + def statement + mask(@command) + end + + def mask(hash) + hash.each_with_object({}) do |(k, v), h| + next if Mongo::Protocol::Msg::INTERNAL_KEYS.include?(k.to_s) + + value = case v + when Hash then mask(v) + when Array then v.map { |e| mask(e) } + else mask_value(k, v) + end + h[k] = value + end + end + + def mask_value(key, value) + if key == @command_name && value == @collection + @collection + elsif @obfuscate + '?' + else + value + end + end + end + end + end +end diff --git a/lib/mongo/monitoring/open_telemetry/tracer.rb b/lib/mongo/monitoring/open_telemetry/tracer.rb index 263124e97e..7611c6394b 100644 --- a/lib/mongo/monitoring/open_telemetry/tracer.rb +++ b/lib/mongo/monitoring/open_telemetry/tracer.rb @@ -17,6 +17,8 @@ module Mongo class Monitoring module OpenTelemetry + # This is a wrapper around OpenTelemetry tracer that provides a convenient + # interface for creating spans. class Tracer ENV_VARIABLE_ENABLED = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_DISABLED' @@ -24,7 +26,7 @@ class Tracer DB_STATEMENT_DEFAULT_VALUE = 'obfuscate' - DB_STATEMENT_VALUES = %i[omit obfuscate include] + DB_STATEMENT_VALUES = %i[omit obfuscate include].freeze OTEL_TRACER_NAME = 'mongo-ruby-driver' diff --git a/spec/mongo/monitoring/open_telemetry/statement_builder_spec.rb b/spec/mongo/monitoring/open_telemetry/statement_builder_spec.rb index 86a9b637f9..74b3b15a69 100644 --- a/spec/mongo/monitoring/open_telemetry/statement_builder_spec.rb +++ b/spec/mongo/monitoring/open_telemetry/statement_builder_spec.rb @@ -30,13 +30,16 @@ end let(:command) { { 'aggregate' => 'orders', 'pipeline' => pipeline, '$db' => 'test' } } + let(:expected_pipeline) do + if obfuscate + [ { '$match' => { 'status' => '?' } }, + { '$group' => { '_id' => '?', 'total' => { '$sum' => '?' } } } ] + else + pipeline + end + end + it 'returns the correct statement' do - expected_pipeline = if obfuscate - [ { '$match' => { 'status' => '?' } }, - { '$group' => { '_id' => '?', 'total' => { '$sum' => '?' } } } ] - else - pipeline - end expected_statement = { 'aggregate' => 'orders', 'pipeline' => expected_pipeline }.to_json.freeze expect(statement_builder.build).to eq(expected_statement) end @@ -81,13 +84,16 @@ { 'delete' => 'users', 'deletes' => [ { 'q' => { 'name' => 'John' }, 'limit' => 1 } ], '$db' => 'test' } end + let(:expected_deletes) do + if obfuscate + [ { 'q' => { 'name' => '?' }, + 'limit' => '?' } ] + else + [ { 'q' => { 'name' => 'John' }, 'limit' => 1 } ] + end + end + it 'returns the correct statement' do - expected_deletes = if obfuscate - [ { 'q' => { 'name' => '?' }, - 'limit' => '?' } ] - else - [ { 'q' => { 'name' => 'John' }, 'limit' => 1 } ] - end expected_statement = { 'delete' => 'users', 'deletes' => expected_deletes }.to_json.freeze expect(statement_builder.build).to eq(expected_statement) end From faabad5b2a5aecec28f494d229bf961154a0627a Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Sat, 6 Jan 2024 09:14:34 +0100 Subject: [PATCH 4/8] More specs --- gemfiles/open_telemetry.gemfile | 9 +++ lib/mongo/monitoring/open_telemetry/tracer.rb | 70 ++++++++++++++----- lib/mongo/operation/shared/executable.rb | 8 +-- .../monitoring/open_telemetry/tracer_spec.rb | 36 ++++++++++ 4 files changed, 100 insertions(+), 23 deletions(-) create mode 100644 gemfiles/open_telemetry.gemfile create mode 100644 spec/mongo/monitoring/open_telemetry/tracer_spec.rb diff --git a/gemfiles/open_telemetry.gemfile b/gemfiles/open_telemetry.gemfile new file mode 100644 index 0000000000..7aba60643d --- /dev/null +++ b/gemfiles/open_telemetry.gemfile @@ -0,0 +1,9 @@ +# rubocop:todo all +source "https://rubygems.org" +gemspec path: '..' + +gem 'opentelemetry-sdk' + +require_relative './standard' + +standard_dependencies diff --git a/lib/mongo/monitoring/open_telemetry/tracer.rb b/lib/mongo/monitoring/open_telemetry/tracer.rb index 7611c6394b..80627aad3e 100644 --- a/lib/mongo/monitoring/open_telemetry/tracer.rb +++ b/lib/mongo/monitoring/open_telemetry/tracer.rb @@ -20,39 +20,71 @@ module OpenTelemetry # This is a wrapper around OpenTelemetry tracer that provides a convenient # interface for creating spans. class Tracer - ENV_VARIABLE_ENABLED = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_DISABLED' - + # Environment variable that enables otel instrumentation. + ENV_VARIABLE_ENABLED = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED' + + # Environment variable that controls the db.statement attribute. + # Possible values are: + # - omit: do not include db.statement attribute + # - obfuscate: obfuscate the attribute value + # - include: include the attribute value as is + # Default value is obfuscate. ENV_VARIABLE_DB_STATEMENT = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_DB_STATEMENT' + # Default value for db.statement attribute. DB_STATEMENT_DEFAULT_VALUE = 'obfuscate' + # Possible values for db.statement attribute. DB_STATEMENT_VALUES = %i[omit obfuscate include].freeze + # Name of the tracer. OTEL_TRACER_NAME = 'mongo-ruby-driver' - def initialize(options = {}) - return unless defined?(::OpenTelemetry) && ENV[ENV_VARIABLE_ENABLED] != 'true' + # @return [ OpenTelemetry::SDK::Trace::Tracer | nil ] The otel tracer. + attr_reader :ot_tracer + + def initialize + return unless defined?(::OpenTelemetry) + return unless ENV[ENV_VARIABLE_ENABLED] == 'true' - @tracer = (options[:opentelemetry_tracer_provider] || ::OpenTelemetry.tracer_provider).tracer( - OTEL_TRACER_NAME, Mongo::VERSION + @ot_tracer = ::OpenTelemetry.tracer_provider.tracer( + OTEL_TRACER_NAME, + Mongo::VERSION ) end + # If otel instrumentation is enabled, creates a span with attributes + # for the message and operation and yields it to the block. + # Otherwise, yields to the block. + # + # @param [ Protocol::Message ] message The message. + # @param [ Operation ] operation The operation. + # @param [ Address ] address The address of the server the message is sent to. + # @param [ Proc ] &block The block to be executed. def in_span(message, operation, address, &block) - @tracer.in_span( - span_name(message, operation), - attributes: attributes(message, operation, address), - kind: :client, - &block - ) + if enabled? + @ot_tracer.in_span( + span_name(message, operation), + attributes: attributes(message, operation, address), + kind: :client, + &block + ) + else + yield + end end + private + + # @return [ true | false ] Whether otel instrumentation is enabled. def enabled? - @tracer != nil + @ot_tracer != nil end - private - + # Validates and returns the value of db.statement attribute of the span. + # + # @return [ Symbol ] The value of db.statement attribute. + # @raise [ ArgumentError ] If the value is invalid. def db_statement @db_statement ||= ENV.fetch(ENV_VARIABLE_DB_STATEMENT, DB_STATEMENT_DEFAULT_VALUE).to_sym.tap do |statement| unless DB_STATEMENT_VALUES.include?(statement) @@ -61,14 +93,17 @@ def db_statement end end + # @return [ true | false ] Whether db.statement attribute should be omitted. def omit? db_statement == :omit end + # @return [ true | false ] Whether db.statement attribute should be obfuscated. def obfuscate? db_statement == :obfuscate end + # @return [ String ] The name of the span. def span_name(message, operation) collection = operation.spec[:coll_name] command_name = message.payload[:command_name] @@ -79,6 +114,7 @@ def span_name(message, operation) end end + # @return [ Hash ] The attributes of the span. def attributes(message, operation, address) { 'db.system' => 'mongodb', @@ -86,9 +122,9 @@ def attributes(message, operation, address) 'db.operation' => message.payload[:command_name], 'net.peer.name' => address.host, 'net.peer.port' => address.port, - 'db.mongodb.collection' => operation.spec[:coll_name], }.tap do |attributes| - attributes['db.statement'] = StatementBuilder.new(message, obfuscate?).build unless omit? + attributes['db.mongodb.collection'] = operation.spec[:coll_name] unless operation.spec[:coll_name].nil? + attributes['db.statement'] = StatementBuilder.new(message.payload[:command], obfuscate?).build unless omit? end end end diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 0abfece12c..4aa19825bd 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -96,12 +96,8 @@ def get_result(connection, context, options = {}) def dispatch_message(connection, context, options = {}) message = build_message(connection, context) message = message.maybe_encrypt(connection, context) - if connection.server.cluster.monitoring&.tracer&.enabled? - reply = nil - connection.server.cluster.monitoring.tracer.in_span(message, self, connection.address) do |span| - reply = connection.dispatch([ message ], context, options) - end - else + reply = nil + connection.server.cluster.monitoring.tracer.in_span(message, self, connection.address) do reply = connection.dispatch([ message ], context, options) end [reply, connection.description, connection.global_id] diff --git a/spec/mongo/monitoring/open_telemetry/tracer_spec.rb b/spec/mongo/monitoring/open_telemetry/tracer_spec.rb new file mode 100644 index 0000000000..40aab3fb36 --- /dev/null +++ b/spec/mongo/monitoring/open_telemetry/tracer_spec.rb @@ -0,0 +1,36 @@ +require 'opentelemetry/sdk' +require 'lite_spec_helper' + +RSpec.describe Mongo::Monitoring::OpenTelemetry::Tracer do + let(:tracer) { described_class.new } + + describe '#initialize' do + context 'when not enabled' do + before do + allow(ENV).to receive(:[]).with(described_class::ENV_VARIABLE_ENABLED).and_return(nil) + end + + it 'does not create a tracer' do + expect(tracer.ot_tracer).to be_nil + end + end + end + + describe '#in_span' do + context 'when disabled' do + before do + allow(ENV).to receive(:[]).with(described_class::ENV_VARIABLE_ENABLED).and_return(nil) + end + + it 'yields to the block' do + expect { |b| tracer.in_span(nil, nil, nil, &b) }.to yield_control + end + end + + context 'when enabled' do + before do + allow(ENV).to receive(:[]).with(described_class::ENV_VARIABLE_ENABLED).and_return('true') + end + end + end +end From 5debf553f86f5e2f4aee9519502af478a20aae54 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Mon, 15 Jan 2024 17:39:17 +0100 Subject: [PATCH 5/8] wip --- .../monitoring/open_telemetry/tracer_spec.rb | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/spec/mongo/monitoring/open_telemetry/tracer_spec.rb b/spec/mongo/monitoring/open_telemetry/tracer_spec.rb index 40aab3fb36..cf9b08bb5e 100644 --- a/spec/mongo/monitoring/open_telemetry/tracer_spec.rb +++ b/spec/mongo/monitoring/open_telemetry/tracer_spec.rb @@ -14,20 +14,28 @@ expect(tracer.ot_tracer).to be_nil end end - end - describe '#in_span' do - context 'when disabled' do + context 'when enabled' do before do - allow(ENV).to receive(:[]).with(described_class::ENV_VARIABLE_ENABLED).and_return(nil) + allow(ENV).to receive(:[]).with(described_class::ENV_VARIABLE_ENABLED).and_return('true') + end + + it 'creates the tracer' do + expect(tracer.ot_tracer).to be_a(OpenTelemetry::SDK::Trace::Tracer) end + end + end + describe '#in_span' do + context 'when not enabled' do it 'yields to the block' do expect { |b| tracer.in_span(nil, nil, nil, &b) }.to yield_control end end context 'when enabled' do + let(:ot_tracer) { instance_double(OpenTelemetry::SDK::Trace::Tracer) } + before do allow(ENV).to receive(:[]).with(described_class::ENV_VARIABLE_ENABLED).and_return('true') end From 6c9a14acbdc528f87d1c8cf6ee1b877466528ccb Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Fri, 13 Dec 2024 09:36:08 -0500 Subject: [PATCH 6/8] Another try --- lib/mongo.rb | 1 + lib/mongo/monitoring.rb | 4 - .../open_telemetry/statement_builder.rb | 74 ---------- lib/mongo/monitoring/open_telemetry/tracer.rb | 133 ------------------ lib/mongo/{monitoring => }/open_telemetry.rb | 13 +- lib/mongo/open_telemetry/statement_builder.rb | 48 +++++++ lib/mongo/open_telemetry/tracer.rb | 130 +++++++++++++++++ lib/mongo/operation/context.rb | 3 + lib/mongo/operation/insert/op_msg.rb | 7 +- lib/mongo/operation/shared/executable.rb | 25 ++-- 10 files changed, 208 insertions(+), 230 deletions(-) delete mode 100644 lib/mongo/monitoring/open_telemetry/statement_builder.rb delete mode 100644 lib/mongo/monitoring/open_telemetry/tracer.rb rename lib/mongo/{monitoring => }/open_telemetry.rb (73%) create mode 100644 lib/mongo/open_telemetry/statement_builder.rb create mode 100644 lib/mongo/open_telemetry/tracer.rb diff --git a/lib/mongo.rb b/lib/mongo.rb index c866ad1a9e..a28ad29859 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -47,6 +47,7 @@ require 'mongo/monitoring' require 'mongo/logger' require 'mongo/retryable' +require 'mongo/open_telemetry' require 'mongo/operation' require 'mongo/error' require 'mongo/event' diff --git a/lib/mongo/monitoring.rb b/lib/mongo/monitoring.rb index c977dee2e1..df22a94c36 100644 --- a/lib/mongo/monitoring.rb +++ b/lib/mongo/monitoring.rb @@ -236,11 +236,8 @@ def initialize(options = {}) subscribe(TOPOLOGY_CHANGED, TopologyChangedLogSubscriber.new(options)) subscribe(TOPOLOGY_CLOSED, TopologyClosedLogSubscriber.new(options)) end - @tracer = options[:otel_tracer] || OpenTelemetry::Tracer.new end - attr_reader :tracer - # @api private attr_reader :options @@ -372,7 +369,6 @@ def initialize_copy(original) require 'mongo/monitoring/publishable' require 'mongo/monitoring/command_log_subscriber' require 'mongo/monitoring/cmap_log_subscriber' -require 'mongo/monitoring/open_telemetry' require 'mongo/monitoring/sdam_log_subscriber' require 'mongo/monitoring/server_description_changed_log_subscriber' require 'mongo/monitoring/server_closed_log_subscriber' diff --git a/lib/mongo/monitoring/open_telemetry/statement_builder.rb b/lib/mongo/monitoring/open_telemetry/statement_builder.rb deleted file mode 100644 index a9f6f15eb3..0000000000 --- a/lib/mongo/monitoring/open_telemetry/statement_builder.rb +++ /dev/null @@ -1,74 +0,0 @@ -# frozen_string_literal: true - -# -# Copyright (C) 2015-present MongoDB Inc. -# -# Licensed under the Apache License, Version 2.0 (the 'License'); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -module Mongo - class Monitoring - # This module contains classes related to OpenTelemetry instrumentation. - # - # @api private - module OpenTelemetry - # This class is used to build a +db.statement+ attribute for an OpenTelemetry span - # from a MongoDB command. - class StatementBuilder - # @param [ BSON::Document ] command The message that will be - # sent to the server. - # @param [ Boolean ] obfuscate Whether to obfuscate the statement. - def initialize(command, obfuscate) - @command = command - @command_name, @collection = command.first - @obfuscate = obfuscate - end - - # Builds the statement. - # - # @return [ String ] The statement as a JSON string. - def build - statement.to_json.freeze unless statement.empty? - end - - private - - def statement - mask(@command) - end - - def mask(hash) - hash.each_with_object({}) do |(k, v), h| - next if Mongo::Protocol::Msg::INTERNAL_KEYS.include?(k.to_s) - - value = case v - when Hash then mask(v) - when Array then v.map { |e| mask(e) } - else mask_value(k, v) - end - h[k] = value - end - end - - def mask_value(key, value) - if key == @command_name && value == @collection - @collection - elsif @obfuscate - '?' - else - value - end - end - end - end - end -end diff --git a/lib/mongo/monitoring/open_telemetry/tracer.rb b/lib/mongo/monitoring/open_telemetry/tracer.rb deleted file mode 100644 index 80627aad3e..0000000000 --- a/lib/mongo/monitoring/open_telemetry/tracer.rb +++ /dev/null @@ -1,133 +0,0 @@ -# frozen_string_literal: true - -# Copyright (C) 2015-present MongoDB Inc. -# -# Licensed under the Apache License, Version 2.0 (the 'License'); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -module Mongo - class Monitoring - module OpenTelemetry - # This is a wrapper around OpenTelemetry tracer that provides a convenient - # interface for creating spans. - class Tracer - # Environment variable that enables otel instrumentation. - ENV_VARIABLE_ENABLED = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED' - - # Environment variable that controls the db.statement attribute. - # Possible values are: - # - omit: do not include db.statement attribute - # - obfuscate: obfuscate the attribute value - # - include: include the attribute value as is - # Default value is obfuscate. - ENV_VARIABLE_DB_STATEMENT = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_DB_STATEMENT' - - # Default value for db.statement attribute. - DB_STATEMENT_DEFAULT_VALUE = 'obfuscate' - - # Possible values for db.statement attribute. - DB_STATEMENT_VALUES = %i[omit obfuscate include].freeze - - # Name of the tracer. - OTEL_TRACER_NAME = 'mongo-ruby-driver' - - # @return [ OpenTelemetry::SDK::Trace::Tracer | nil ] The otel tracer. - attr_reader :ot_tracer - - def initialize - return unless defined?(::OpenTelemetry) - return unless ENV[ENV_VARIABLE_ENABLED] == 'true' - - @ot_tracer = ::OpenTelemetry.tracer_provider.tracer( - OTEL_TRACER_NAME, - Mongo::VERSION - ) - end - - # If otel instrumentation is enabled, creates a span with attributes - # for the message and operation and yields it to the block. - # Otherwise, yields to the block. - # - # @param [ Protocol::Message ] message The message. - # @param [ Operation ] operation The operation. - # @param [ Address ] address The address of the server the message is sent to. - # @param [ Proc ] &block The block to be executed. - def in_span(message, operation, address, &block) - if enabled? - @ot_tracer.in_span( - span_name(message, operation), - attributes: attributes(message, operation, address), - kind: :client, - &block - ) - else - yield - end - end - - private - - # @return [ true | false ] Whether otel instrumentation is enabled. - def enabled? - @ot_tracer != nil - end - - # Validates and returns the value of db.statement attribute of the span. - # - # @return [ Symbol ] The value of db.statement attribute. - # @raise [ ArgumentError ] If the value is invalid. - def db_statement - @db_statement ||= ENV.fetch(ENV_VARIABLE_DB_STATEMENT, DB_STATEMENT_DEFAULT_VALUE).to_sym.tap do |statement| - unless DB_STATEMENT_VALUES.include?(statement) - raise ArgumentError, "Invalid value for #{ENV_VARIABLE_DB_STATEMENT}: #{statement}" - end - end - end - - # @return [ true | false ] Whether db.statement attribute should be omitted. - def omit? - db_statement == :omit - end - - # @return [ true | false ] Whether db.statement attribute should be obfuscated. - def obfuscate? - db_statement == :obfuscate - end - - # @return [ String ] The name of the span. - def span_name(message, operation) - collection = operation.spec[:coll_name] - command_name = message.payload[:command_name] - if collection - "#{collection}.#{command_name}" - else - command_name - end - end - - # @return [ Hash ] The attributes of the span. - def attributes(message, operation, address) - { - 'db.system' => 'mongodb', - 'db.name' => message.payload[:database_name], - 'db.operation' => message.payload[:command_name], - 'net.peer.name' => address.host, - 'net.peer.port' => address.port, - }.tap do |attributes| - attributes['db.mongodb.collection'] = operation.spec[:coll_name] unless operation.spec[:coll_name].nil? - attributes['db.statement'] = StatementBuilder.new(message.payload[:command], obfuscate?).build unless omit? - end - end - end - end - end -end diff --git a/lib/mongo/monitoring/open_telemetry.rb b/lib/mongo/open_telemetry.rb similarity index 73% rename from lib/mongo/monitoring/open_telemetry.rb rename to lib/mongo/open_telemetry.rb index 4e2a847269..8f5eb21319 100644 --- a/lib/mongo/monitoring/open_telemetry.rb +++ b/lib/mongo/open_telemetry.rb @@ -15,12 +15,15 @@ # limitations under the License. module Mongo - class Monitoring - # @api private - module OpenTelemetry + # Container class for OpenTelemetry functionality. + # + # @api private + class OpenTelemetry + def self.tracer + @tracer ||= Tracer.new end end end -require 'mongo/monitoring/open_telemetry/statement_builder' -require 'mongo/monitoring/open_telemetry/tracer' +require 'mongo/open_telemetry/statement_builder' +require 'mongo/open_telemetry/tracer' diff --git a/lib/mongo/open_telemetry/statement_builder.rb b/lib/mongo/open_telemetry/statement_builder.rb new file mode 100644 index 0000000000..d467c1f7d6 --- /dev/null +++ b/lib/mongo/open_telemetry/statement_builder.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +# +# Copyright (C) 2015-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class OpenTelemetry + # This class is used to build +db.statement+ attribute for an OpenTelemetry span + # from a MongoDB command. + class StatementBuilder + # @param [ BSON::Document ] command The message that will be + # sent to the server. + def initialize(command) + @command = command + @command_name, @collection = command.first + end + + # Builds the statement. + # + # @return [ String ] The statement as a JSON string. + def build + statement.to_json.freeze unless statement.empty? + end + + private + + def statement + mask(@command) + end + + def mask(hash) + hash.reject { |k, v| Mongo::Protocol::Msg::INTERNAL_KEYS.include?(k.to_s) } + end + end + end +end diff --git a/lib/mongo/open_telemetry/tracer.rb b/lib/mongo/open_telemetry/tracer.rb new file mode 100644 index 0000000000..4e55826fe9 --- /dev/null +++ b/lib/mongo/open_telemetry/tracer.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +# Copyright (C) 2015-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class OpenTelemetry + # This is a wrapper around OpenTelemetry tracer that provides a convenient + # interface for creating spans. + class Tracer + # Environment variable that enables otel instrumentation. + ENV_VARIABLE_DISABLED = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_DISABLED' + + # Environment variable that controls the db.statement attribute. + ENV_VARIABLE_QUERY_TEXT = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_QUERY_TEXT' + + # Name of the tracer. + OTEL_TRACER_NAME = 'mongo-ruby-driver' + + # @return [ OpenTelemetry::SDK::Trace::Tracer | nil ] The otel tracer. + attr_reader :ot_tracer + + def initialize + return unless defined?(::OpenTelemetry) + return if %w[ 1 yes true ].include?(ENV[ENV_VARIABLE_DISABLED]) + + @ot_tracer = ::OpenTelemetry.tracer_provider.tracer( + OTEL_TRACER_NAME, + Mongo::VERSION + ) + end + + def trace_message(command, address, &block) + in_span(span_name(command), build_attributes(command, address), &block) + end + + def add_query_text(span, message) + return unless span && query_text? + + span.add_attributes( + 'db.query.text' => StatementBuilder.new(message.payload[:command]).build + ) + end + + # @param [ OpenTelemetry::Trace::Span | nil ] span + # @param [ Mongo::Operation::Result ] result + def add_attributes_from_result(span, result) + return if span.nil? + + if result.successful? + if (cursor_id = result.cursor_id).positive? + span.add_attributes( + 'db.mongodb.cursor_id' => cursor_id + ) + end + else + span.record_exception(result.error) + end + end + + private + + def in_span(name, attributes = {}, &block) + if enabled? + @ot_tracer.in_span(name, attributes: attributes, kind: :client, &block) + else + yield + end + end + + # @return [ true, false ] Whether otel instrumentation is enabled. + def enabled? + @ot_tracer != nil + end + + def query_text? + %w[ 1 yes true ].include?(ENV[ENV_VARIABLE_QUERY_TEXT]) + end + + # @return [ String ] The name of the span. + def span_name(command) + collection = collection_name(command) + command_name = command.keys.first + if collection + "#{collection}.#{command_name}" + else + command_name + end + end + + # @return [ Hash ] The attributes of the span. + def build_attributes(command, address) + command_name = command.keys.first + { + 'db.system' => 'mongodb', + 'db.namespace' => command['$db'], + 'db.operation.name' => command_name, + 'server.port' => address.port, + 'net.peer.port' => address.port, + 'server.address' => address.host, + 'net.peer.address' => address.host, + 'db.query.summary' => span_name(command) + }.tap do |attributes| + if (coll_name = collection_name(command)) + attributes['db.collection.name'] = coll_name + end + if command_name == 'getMore' + attributes['db.mongodb.cursor_id'] = command[command_name].value + end + end + end + + # @return [ String | nil] Name of collection the operation is executed on. + def collection_name(command) + command.values.first if command.values.first.is_a?(String) + end + end + end +end diff --git a/lib/mongo/operation/context.rb b/lib/mongo/operation/context.rb index 03d6e0957d..bf38354dda 100644 --- a/lib/mongo/operation/context.rb +++ b/lib/mongo/operation/context.rb @@ -41,6 +41,7 @@ def initialize( connection_global_id: nil, operation_timeouts: {}, view: nil, + tracer: OpenTelemetry.tracer, options: nil ) if options @@ -61,6 +62,7 @@ def initialize( @session = session @view = view @connection_global_id = connection_global_id + @tracer = tracer @options = options super(session: session, operation_timeouts: operation_timeouts) end @@ -69,6 +71,7 @@ def initialize( attr_reader :session attr_reader :view attr_reader :options + attr_reader :tracer # Returns a new Operation::Context with the deadline refreshed # and relative to the current moment. diff --git a/lib/mongo/operation/insert/op_msg.rb b/lib/mongo/operation/insert/op_msg.rb index 39b299ef76..ed5842bff2 100644 --- a/lib/mongo/operation/insert/op_msg.rb +++ b/lib/mongo/operation/insert/op_msg.rb @@ -34,8 +34,11 @@ class OpMsg < OpMsgBase private def get_result(connection, context, options = {}) - # This is a Mongo::Operation::Insert::Result - Result.new(*dispatch_message(connection, context), @ids, context: context) + context.tracer.trace_message(command(connection), connection.address) do |span| + # This is a Mongo::Operation::Insert::Result + Result.new(*dispatch_message(connection, context, span), @ids, context: context) + context.tracer.add_attributes_from_result(span, result) + end end def selector(connection) diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 5f1c48c161..0be3a68e65 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -32,7 +32,7 @@ module Executable # execute this operation. attr_accessor :context - def do_execute(connection, context, options = {}) + def do_execute(connection, context, span, options = {}) # Save the context on the instance, to avoid having to pass it as a # parameter to every single method. There are many legacy methods that # still accept it as a parameter, which are left as-is for now to @@ -40,12 +40,11 @@ def do_execute(connection, context, options = {}) # reasonable to refactor things so this saved reference is used instead. @context = context - session&.materialize_if_needed unpin_maybe(session, connection) do add_error_labels(connection, context) do check_for_network_error do add_server_diagnostics(connection) do - get_result(connection, context, options).tap do |result| + get_result(connection, context, span, options).tap do |result| if session if session.in_transaction? && connection.description.load_balancer? @@ -92,8 +91,12 @@ def execute(connection, context:, options: {}) end end - do_execute(connection, context, options).tap do |result| - validate_result(result, connection, context) + session&.materialize_if_needed + context.tracer.trace_message(command(connection), connection.address) do |span| + do_execute(connection, context, span, options).tap do |result| + context.tracer.add_attributes_from_result(span, result) + validate_result(result, connection, context) + end end end @@ -103,18 +106,16 @@ def result_class Result end - def get_result(connection, context, options = {}) - result_class.new(*dispatch_message(connection, context, options), context: context, connection: connection) + def get_result(connection, context, span, options = {}) + result_class.new(*dispatch_message(connection, context, span, options), context: context, connection: connection) end # Returns a Protocol::Message or nil as reply. - def dispatch_message(connection, context, options = {}) + def dispatch_message(connection, context, span, options = {}) message = build_message(connection, context) message = message.maybe_encrypt(connection, context) - reply = nil - connection.server.cluster.monitoring.tracer.in_span(message, self, connection.address) do - reply = connection.dispatch([ message ], context, options) - end + context.tracer.add_query_text(span, message) + reply = connection.dispatch([ message ], context, options) [reply, connection.description, connection.global_id] end From 36fa7a4ed27a08d4549d06c0d6132f9d1e1962e9 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Mon, 20 Jan 2025 15:10:03 +0100 Subject: [PATCH 7/8] otel --- lib/mongo/collection/view/iterable.rb | 38 ++++++++++++------ lib/mongo/cursor.rb | 13 ++++-- lib/mongo/open_telemetry/tracer.rb | 50 ++++++++++++------------ lib/mongo/operation/context.rb | 2 + lib/mongo/operation/shared/executable.rb | 21 +++++----- lib/mongo/retryable/read_worker.rb | 1 + lib/mongo/server/connection_base.rb | 2 + 7 files changed, 74 insertions(+), 53 deletions(-) diff --git a/lib/mongo/collection/view/iterable.rb b/lib/mongo/collection/view/iterable.rb index 99133c5e9f..1ce457208e 100644 --- a/lib/mongo/collection/view/iterable.rb +++ b/lib/mongo/collection/view/iterable.rb @@ -88,20 +88,34 @@ def select_cursor(session) operation_timeouts: operation_timeouts, view: self ) - - if respond_to?(:write?, true) && write? - server = server_selector.select_server(cluster, nil, session, write_aggregation: true) - result = send_initial_query(server, context) - - if use_query_cache? - CachingCursor.new(view, result, server, session: session, context: context) + op = initial_query_op(session) + operation_name = op.is_a?(Mongo::Operation::Find) ? 'find' : 'explain' + attrs = { + 'db.system' => 'mongodb', + 'db.namespace' => op.spec[:db_name], + 'db.collection.name' => op.spec[:coll_name], + 'db.operation.name' => operation_name, + 'db.query.summary' => "#{op.spec[:coll_name]}.#{operation_name}" + } + context.tracer.in_span("#{op.spec[:coll_name]}.#{operation_name}", attrs) do |span| + context.current_span = span + if respond_to?(:write?, true) && write? + server = server_selector.select_server(cluster, nil, session, write_aggregation: true) + context.tracer.add_event(span, 'server selected', {}) + result = send_initial_query(server, context) + + if use_query_cache? + CachingCursor.new(view, result, server, session: session, context: context) + else + Cursor.new(view, result, server, session: session, context: context) + end else - Cursor.new(view, result, server, session: session, context: context) - end - else - read_with_retry_cursor(session, server_selector, view, context: context) do |server| - send_initial_query(server, context) + read_with_retry_cursor(session, server_selector, view, context: context) do |server| + send_initial_query(server, context) + end end + ensure + context.current_span = nil end end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 0e0927f02c..84fd473a09 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -514,10 +514,15 @@ def unregister def execute_operation(op, context: nil) op_context = context || possibly_refreshed_context - if @connection.nil? - op.execute(@server, context: op_context) - else - op.execute_with_connection(@connection, context: op_context) + op_context.tracer.in_span('getMore', {}) do |span| + op_context.current_span = span + if @connection.nil? + op.execute(@server, context: op_context) + else + op.execute_with_connection(@connection, context: op_context) + end + ensure + op_context.current_span = nil end end diff --git a/lib/mongo/open_telemetry/tracer.rb b/lib/mongo/open_telemetry/tracer.rb index 4e55826fe9..47daa911ff 100644 --- a/lib/mongo/open_telemetry/tracer.rb +++ b/lib/mongo/open_telemetry/tracer.rb @@ -41,14 +41,24 @@ def initialize ) end - def trace_message(command, address, &block) - in_span(span_name(command), build_attributes(command, address), &block) + def in_span(name, attributes = {}, &block) + if enabled? + @ot_tracer.in_span(name, attributes: attributes, kind: :client, &block) + else + yield + end + end + + def add_attributes_from_command(span, command, address) + return unless enabled? + + span&.add_attributes(build_attributes(command, address)) end def add_query_text(span, message) - return unless span && query_text? + return unless enabled? && query_text? - span.add_attributes( + span&.add_attributes( 'db.query.text' => StatementBuilder.new(message.payload[:command]).build ) end @@ -56,29 +66,27 @@ def add_query_text(span, message) # @param [ OpenTelemetry::Trace::Span | nil ] span # @param [ Mongo::Operation::Result ] result def add_attributes_from_result(span, result) - return if span.nil? + return unless enabled? if result.successful? if (cursor_id = result.cursor_id).positive? - span.add_attributes( + span&.add_attributes( 'db.mongodb.cursor_id' => cursor_id ) end else - span.record_exception(result.error) + span&.record_exception(result.error) end end - private + def add_event(span, name, attributes = nil) + return unless enabled? - def in_span(name, attributes = {}, &block) - if enabled? - @ot_tracer.in_span(name, attributes: attributes, kind: :client, &block) - else - yield - end + span&.add_event(name, attributes: attributes) end + private + # @return [ true, false ] Whether otel instrumentation is enabled. def enabled? @ot_tracer != nil @@ -103,28 +111,18 @@ def span_name(command) def build_attributes(command, address) command_name = command.keys.first { - 'db.system' => 'mongodb', - 'db.namespace' => command['$db'], - 'db.operation.name' => command_name, 'server.port' => address.port, 'net.peer.port' => address.port, 'server.address' => address.host, 'net.peer.address' => address.host, - 'db.query.summary' => span_name(command) }.tap do |attributes| - if (coll_name = collection_name(command)) - attributes['db.collection.name'] = coll_name - end if command_name == 'getMore' attributes['db.mongodb.cursor_id'] = command[command_name].value end end end - - # @return [ String | nil] Name of collection the operation is executed on. - def collection_name(command) - command.values.first if command.values.first.is_a?(String) - end end end end + +# ``` diff --git a/lib/mongo/operation/context.rb b/lib/mongo/operation/context.rb index bf38354dda..b2600e6202 100644 --- a/lib/mongo/operation/context.rb +++ b/lib/mongo/operation/context.rb @@ -73,6 +73,8 @@ def initialize( attr_reader :options attr_reader :tracer + attr_accessor :current_span + # Returns a new Operation::Context with the deadline refreshed # and relative to the current moment. # diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 0be3a68e65..8eeb534c78 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -32,7 +32,7 @@ module Executable # execute this operation. attr_accessor :context - def do_execute(connection, context, span, options = {}) + def do_execute(connection, context, options = {}) # Save the context on the instance, to avoid having to pass it as a # parameter to every single method. There are many legacy methods that # still accept it as a parameter, which are left as-is for now to @@ -44,7 +44,7 @@ def do_execute(connection, context, span, options = {}) add_error_labels(connection, context) do check_for_network_error do add_server_diagnostics(connection) do - get_result(connection, context, span, options).tap do |result| + get_result(connection, context, options).tap do |result| if session if session.in_transaction? && connection.description.load_balancer? @@ -92,11 +92,10 @@ def execute(connection, context:, options: {}) end session&.materialize_if_needed - context.tracer.trace_message(command(connection), connection.address) do |span| - do_execute(connection, context, span, options).tap do |result| - context.tracer.add_attributes_from_result(span, result) - validate_result(result, connection, context) - end + context.tracer.add_attributes_from_command(context.current_span, command(connection), connection.address) + do_execute(connection, context, options).tap do |result| + context.tracer.add_attributes_from_result(context.current_span, result) + validate_result(result, connection, context) end end @@ -106,15 +105,15 @@ def result_class Result end - def get_result(connection, context, span, options = {}) - result_class.new(*dispatch_message(connection, context, span, options), context: context, connection: connection) + def get_result(connection, context, options = {}) + result_class.new(*dispatch_message(connection, context, options), context: context, connection: connection) end # Returns a Protocol::Message or nil as reply. - def dispatch_message(connection, context, span, options = {}) + def dispatch_message(connection, context, options = {}) message = build_message(connection, context) message = message.maybe_encrypt(connection, context) - context.tracer.add_query_text(span, message) + context.tracer.add_query_text(context.current_span, message) reply = connection.dispatch([ message ], context, options) [reply, connection.description, connection.global_id] end diff --git a/lib/mongo/retryable/read_worker.rb b/lib/mongo/retryable/read_worker.rb index eb3272ca34..3dde638618 100644 --- a/lib/mongo/retryable/read_worker.rb +++ b/lib/mongo/retryable/read_worker.rb @@ -202,6 +202,7 @@ def modern_read_with_retry(session, server_selector, context, &block) session, timeout: context&.remaining_timeout_sec ) + context&.tracer&.add_event(context&.current_span, 'Server Selected') yield server rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e e.add_notes('modern retry', 'attempt 1') diff --git a/lib/mongo/server/connection_base.rb b/lib/mongo/server/connection_base.rb index 2803cb9c45..c73012c522 100644 --- a/lib/mongo/server/connection_base.rb +++ b/lib/mongo/server/connection_base.rb @@ -169,6 +169,7 @@ def deliver(message, context, options = {}) raise Error::LintError, "Trying to deliver a message over a disconnected connection (to #{address})" end buffer = serialize(message, context) + context&.tracer&.add_event(context&.current_span, 'Message Ready') check_timeout!(context) ensure_connected do |socket| operation_id = Monitoring.next_operation_id @@ -207,6 +208,7 @@ def deliver(message, context, options = {}) server_connection_id: description.server_connection_id, service_id: description.service_id, ) + context&.tracer&.add_event(context&.current_span, 'Response Received') end if result && context.decrypt? result = result.maybe_decrypt(context) From 9f5a3e6d8c42051a444579e47757e9288ee64360 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Thu, 30 Jan 2025 16:25:04 +0100 Subject: [PATCH 8/8] otel --- lib/mongo/collection/view/iterable.rb | 9 +++---- lib/mongo/cursor.rb | 4 +++- lib/mongo/open_telemetry/tracer.rb | 30 +++++++++++++++++++----- lib/mongo/operation/context.rb | 2 +- lib/mongo/operation/shared/executable.rb | 12 ++++++---- 5 files changed, 41 insertions(+), 16 deletions(-) diff --git a/lib/mongo/collection/view/iterable.rb b/lib/mongo/collection/view/iterable.rb index 1ce457208e..d4fa4be56f 100644 --- a/lib/mongo/collection/view/iterable.rb +++ b/lib/mongo/collection/view/iterable.rb @@ -94,14 +94,14 @@ def select_cursor(session) 'db.system' => 'mongodb', 'db.namespace' => op.spec[:db_name], 'db.collection.name' => op.spec[:coll_name], - 'db.operation.name' => operation_name, - 'db.query.summary' => "#{op.spec[:coll_name]}.#{operation_name}" + 'db.operation.name' => 'find', + 'db.operation.summary' => "find #{op.spec[:coll_name]}" } - context.tracer.in_span("#{op.spec[:coll_name]}.#{operation_name}", attrs) do |span| + context.tracer.in_span("find #{op.spec[:coll_name]}", attrs) do |span, ctx| context.current_span = span + context.current_context = ctx if respond_to?(:write?, true) && write? server = server_selector.select_server(cluster, nil, session, write_aggregation: true) - context.tracer.add_event(span, 'server selected', {}) result = send_initial_query(server, context) if use_query_cache? @@ -116,6 +116,7 @@ def select_cursor(session) end ensure context.current_span = nil + context.current_context = nil end end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 84fd473a09..11d5f4bad2 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -514,8 +514,9 @@ def unregister def execute_operation(op, context: nil) op_context = context || possibly_refreshed_context - op_context.tracer.in_span('getMore', {}) do |span| + op_context.tracer.in_span('getMore', {}) do |span, ctx| op_context.current_span = span + op_context.current_context = ctx if @connection.nil? op.execute(@server, context: op_context) else @@ -523,6 +524,7 @@ def execute_operation(op, context: nil) end ensure op_context.current_span = nil + op_context.current_context = nil end end diff --git a/lib/mongo/open_telemetry/tracer.rb b/lib/mongo/open_telemetry/tracer.rb index 47daa911ff..b50af9c6f7 100644 --- a/lib/mongo/open_telemetry/tracer.rb +++ b/lib/mongo/open_telemetry/tracer.rb @@ -41,6 +41,17 @@ def initialize ) end + def start_span(command, address, parent) + if enabled? + attributes = build_attributes(command, address) + @ot_tracer.start_span( + span_name(command), + with_parent: parent, + attributes: attributes + ) + end + end + def in_span(name, attributes = {}, &block) if enabled? @ot_tracer.in_span(name, attributes: attributes, kind: :client, &block) @@ -79,10 +90,7 @@ def add_attributes_from_result(span, result) end end - def add_event(span, name, attributes = nil) - return unless enabled? - - span&.add_event(name, attributes: attributes) + def add_event(span, name, attributes = {}) end private @@ -111,18 +119,28 @@ def span_name(command) def build_attributes(command, address) command_name = command.keys.first { + 'db.system' => 'mongodb', + 'db.namespace' => command['$db'], + 'db.command.name' => command_name, 'server.port' => address.port, 'net.peer.port' => address.port, 'server.address' => address.host, 'net.peer.address' => address.host, + 'db.query.summary' => span_name(command) }.tap do |attributes| + if (coll_name = collection_name(command)) + attributes['db.collection.name'] = coll_name + end if command_name == 'getMore' attributes['db.mongodb.cursor_id'] = command[command_name].value end end end + + # @return [ String | nil] Name of collection the operation is executed on. + def collection_name(command) + command.values.first if command.values.first.is_a?(String) + end end end end - -# ``` diff --git a/lib/mongo/operation/context.rb b/lib/mongo/operation/context.rb index b2600e6202..0b590a4830 100644 --- a/lib/mongo/operation/context.rb +++ b/lib/mongo/operation/context.rb @@ -73,7 +73,7 @@ def initialize( attr_reader :options attr_reader :tracer - attr_accessor :current_span + attr_accessor :current_span, :current_context # Returns a new Operation::Context with the deadline refreshed # and relative to the current moment. diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 8eeb534c78..6a0b81efd4 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -92,10 +92,14 @@ def execute(connection, context:, options: {}) end session&.materialize_if_needed - context.tracer.add_attributes_from_command(context.current_span, command(connection), connection.address) - do_execute(connection, context, options).tap do |result| - context.tracer.add_attributes_from_result(context.current_span, result) - validate_result(result, connection, context) + begin + span = context.tracer.start_span(command(connection), connection.address, context.current_context) + do_execute(connection, context, options).tap do |result| + context.tracer.add_attributes_from_result(span, result) + validate_result(result, connection, context) + end + ensure + span&.finish end end