diff --git a/gemfiles/open_telemetry.gemfile b/gemfiles/open_telemetry.gemfile new file mode 100644 index 0000000000..7aba60643d --- /dev/null +++ b/gemfiles/open_telemetry.gemfile @@ -0,0 +1,9 @@ +# rubocop:todo all +source "https://rubygems.org" +gemspec path: '..' + +gem 'opentelemetry-sdk' + +require_relative './standard' + +standard_dependencies diff --git a/lib/mongo.rb b/lib/mongo.rb index c866ad1a9e..a28ad29859 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -47,6 +47,7 @@ require 'mongo/monitoring' require 'mongo/logger' require 'mongo/retryable' +require 'mongo/open_telemetry' require 'mongo/operation' require 'mongo/error' require 'mongo/event' diff --git a/lib/mongo/collection/view/iterable.rb b/lib/mongo/collection/view/iterable.rb index 99133c5e9f..d4fa4be56f 100644 --- a/lib/mongo/collection/view/iterable.rb +++ b/lib/mongo/collection/view/iterable.rb @@ -88,20 +88,35 @@ def select_cursor(session) operation_timeouts: operation_timeouts, view: self ) - - if respond_to?(:write?, true) && write? - server = server_selector.select_server(cluster, nil, session, write_aggregation: true) - result = send_initial_query(server, context) - - if use_query_cache? - CachingCursor.new(view, result, server, session: session, context: context) + op = initial_query_op(session) + operation_name = op.is_a?(Mongo::Operation::Find) ? 'find' : 'explain' + attrs = { + 'db.system' => 'mongodb', + 'db.namespace' => op.spec[:db_name], + 'db.collection.name' => op.spec[:coll_name], + 'db.operation.name' => 'find', + 'db.operation.summary' => "find #{op.spec[:coll_name]}" + } + context.tracer.in_span("find #{op.spec[:coll_name]}", attrs) do |span, ctx| + context.current_span = span + context.current_context = ctx + if respond_to?(:write?, true) && write? + server = server_selector.select_server(cluster, nil, session, write_aggregation: true) + result = send_initial_query(server, context) + + if use_query_cache? + CachingCursor.new(view, result, server, session: session, context: context) + else + Cursor.new(view, result, server, session: session, context: context) + end else - Cursor.new(view, result, server, session: session, context: context) - end - else - read_with_retry_cursor(session, server_selector, view, context: context) do |server| - send_initial_query(server, context) + read_with_retry_cursor(session, server_selector, view, context: context) do |server| + send_initial_query(server, context) + end end + ensure + context.current_span = nil + context.current_context = nil end end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 0e0927f02c..11d5f4bad2 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -514,10 +514,17 @@ def unregister def execute_operation(op, context: nil) op_context = context || possibly_refreshed_context - if @connection.nil? - op.execute(@server, context: op_context) - else - op.execute_with_connection(@connection, context: op_context) + op_context.tracer.in_span('getMore', {}) do |span, ctx| + op_context.current_span = span + op_context.current_context = ctx + if @connection.nil? + op.execute(@server, context: op_context) + else + op.execute_with_connection(@connection, context: op_context) + end + ensure + op_context.current_span = nil + op_context.current_context = nil end end diff --git a/lib/mongo/open_telemetry.rb b/lib/mongo/open_telemetry.rb new file mode 100644 index 0000000000..8f5eb21319 --- /dev/null +++ b/lib/mongo/open_telemetry.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +# Copyright (C) 2015-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + # Container class for OpenTelemetry functionality. + # + # @api private + class OpenTelemetry + def self.tracer + @tracer ||= Tracer.new + end + end +end + +require 'mongo/open_telemetry/statement_builder' +require 'mongo/open_telemetry/tracer' diff --git a/lib/mongo/open_telemetry/statement_builder.rb b/lib/mongo/open_telemetry/statement_builder.rb new file mode 100644 index 0000000000..d467c1f7d6 --- /dev/null +++ b/lib/mongo/open_telemetry/statement_builder.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +# +# Copyright (C) 2015-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class OpenTelemetry + # This class is used to build +db.statement+ attribute for an OpenTelemetry span + # from a MongoDB command. + class StatementBuilder + # @param [ BSON::Document ] command The message that will be + # sent to the server. + def initialize(command) + @command = command + @command_name, @collection = command.first + end + + # Builds the statement. + # + # @return [ String ] The statement as a JSON string. + def build + statement.to_json.freeze unless statement.empty? + end + + private + + def statement + mask(@command) + end + + def mask(hash) + hash.reject { |k, v| Mongo::Protocol::Msg::INTERNAL_KEYS.include?(k.to_s) } + end + end + end +end diff --git a/lib/mongo/open_telemetry/tracer.rb b/lib/mongo/open_telemetry/tracer.rb new file mode 100644 index 0000000000..b50af9c6f7 --- /dev/null +++ b/lib/mongo/open_telemetry/tracer.rb @@ -0,0 +1,146 @@ +# frozen_string_literal: true + +# Copyright (C) 2015-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class OpenTelemetry + # This is a wrapper around OpenTelemetry tracer that provides a convenient + # interface for creating spans. + class Tracer + # Environment variable that enables otel instrumentation. + ENV_VARIABLE_DISABLED = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_DISABLED' + + # Environment variable that controls the db.statement attribute. + ENV_VARIABLE_QUERY_TEXT = 'OTEL_RUBY_INSTRUMENTATION_MONGODB_QUERY_TEXT' + + # Name of the tracer. + OTEL_TRACER_NAME = 'mongo-ruby-driver' + + # @return [ OpenTelemetry::SDK::Trace::Tracer | nil ] The otel tracer. + attr_reader :ot_tracer + + def initialize + return unless defined?(::OpenTelemetry) + return if %w[ 1 yes true ].include?(ENV[ENV_VARIABLE_DISABLED]) + + @ot_tracer = ::OpenTelemetry.tracer_provider.tracer( + OTEL_TRACER_NAME, + Mongo::VERSION + ) + end + + def start_span(command, address, parent) + if enabled? + attributes = build_attributes(command, address) + @ot_tracer.start_span( + span_name(command), + with_parent: parent, + attributes: attributes + ) + end + end + + def in_span(name, attributes = {}, &block) + if enabled? + @ot_tracer.in_span(name, attributes: attributes, kind: :client, &block) + else + yield + end + end + + def add_attributes_from_command(span, command, address) + return unless enabled? + + span&.add_attributes(build_attributes(command, address)) + end + + def add_query_text(span, message) + return unless enabled? && query_text? + + span&.add_attributes( + 'db.query.text' => StatementBuilder.new(message.payload[:command]).build + ) + end + + # @param [ OpenTelemetry::Trace::Span | nil ] span + # @param [ Mongo::Operation::Result ] result + def add_attributes_from_result(span, result) + return unless enabled? + + if result.successful? + if (cursor_id = result.cursor_id).positive? + span&.add_attributes( + 'db.mongodb.cursor_id' => cursor_id + ) + end + else + span&.record_exception(result.error) + end + end + + def add_event(span, name, attributes = {}) + end + + private + + # @return [ true, false ] Whether otel instrumentation is enabled. + def enabled? + @ot_tracer != nil + end + + def query_text? + %w[ 1 yes true ].include?(ENV[ENV_VARIABLE_QUERY_TEXT]) + end + + # @return [ String ] The name of the span. + def span_name(command) + collection = collection_name(command) + command_name = command.keys.first + if collection + "#{collection}.#{command_name}" + else + command_name + end + end + + # @return [ Hash ] The attributes of the span. + def build_attributes(command, address) + command_name = command.keys.first + { + 'db.system' => 'mongodb', + 'db.namespace' => command['$db'], + 'db.command.name' => command_name, + 'server.port' => address.port, + 'net.peer.port' => address.port, + 'server.address' => address.host, + 'net.peer.address' => address.host, + 'db.query.summary' => span_name(command) + }.tap do |attributes| + if (coll_name = collection_name(command)) + attributes['db.collection.name'] = coll_name + end + if command_name == 'getMore' + attributes['db.mongodb.cursor_id'] = command[command_name].value + end + end + end + + # @return [ String | nil] Name of collection the operation is executed on. + def collection_name(command) + command.values.first if command.values.first.is_a?(String) + end + end + end +end diff --git a/lib/mongo/operation/context.rb b/lib/mongo/operation/context.rb index 03d6e0957d..0b590a4830 100644 --- a/lib/mongo/operation/context.rb +++ b/lib/mongo/operation/context.rb @@ -41,6 +41,7 @@ def initialize( connection_global_id: nil, operation_timeouts: {}, view: nil, + tracer: OpenTelemetry.tracer, options: nil ) if options @@ -61,6 +62,7 @@ def initialize( @session = session @view = view @connection_global_id = connection_global_id + @tracer = tracer @options = options super(session: session, operation_timeouts: operation_timeouts) end @@ -69,6 +71,9 @@ def initialize( attr_reader :session attr_reader :view attr_reader :options + attr_reader :tracer + + attr_accessor :current_span, :current_context # Returns a new Operation::Context with the deadline refreshed # and relative to the current moment. diff --git a/lib/mongo/operation/insert/op_msg.rb b/lib/mongo/operation/insert/op_msg.rb index 39b299ef76..ed5842bff2 100644 --- a/lib/mongo/operation/insert/op_msg.rb +++ b/lib/mongo/operation/insert/op_msg.rb @@ -34,8 +34,11 @@ class OpMsg < OpMsgBase private def get_result(connection, context, options = {}) - # This is a Mongo::Operation::Insert::Result - Result.new(*dispatch_message(connection, context), @ids, context: context) + context.tracer.trace_message(command(connection), connection.address) do |span| + # This is a Mongo::Operation::Insert::Result + Result.new(*dispatch_message(connection, context, span), @ids, context: context) + context.tracer.add_attributes_from_result(span, result) + end end def selector(connection) diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 041e4d1e5b..6a0b81efd4 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -40,7 +40,6 @@ def do_execute(connection, context, options = {}) # reasonable to refactor things so this saved reference is used instead. @context = context - session&.materialize_if_needed unpin_maybe(session, connection) do add_error_labels(connection, context) do check_for_network_error do @@ -92,8 +91,15 @@ def execute(connection, context:, options: {}) end end - do_execute(connection, context, options).tap do |result| - validate_result(result, connection, context) + session&.materialize_if_needed + begin + span = context.tracer.start_span(command(connection), connection.address, context.current_context) + do_execute(connection, context, options).tap do |result| + context.tracer.add_attributes_from_result(span, result) + validate_result(result, connection, context) + end + ensure + span&.finish end end @@ -111,6 +117,7 @@ def get_result(connection, context, options = {}) def dispatch_message(connection, context, options = {}) message = build_message(connection, context) message = message.maybe_encrypt(connection, context) + context.tracer.add_query_text(context.current_span, message) reply = connection.dispatch([ message ], context, options) [reply, connection.description, connection.global_id] end diff --git a/lib/mongo/retryable/read_worker.rb b/lib/mongo/retryable/read_worker.rb index eb3272ca34..3dde638618 100644 --- a/lib/mongo/retryable/read_worker.rb +++ b/lib/mongo/retryable/read_worker.rb @@ -202,6 +202,7 @@ def modern_read_with_retry(session, server_selector, context, &block) session, timeout: context&.remaining_timeout_sec ) + context&.tracer&.add_event(context&.current_span, 'Server Selected') yield server rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e e.add_notes('modern retry', 'attempt 1') diff --git a/lib/mongo/server/connection_base.rb b/lib/mongo/server/connection_base.rb index 2803cb9c45..c73012c522 100644 --- a/lib/mongo/server/connection_base.rb +++ b/lib/mongo/server/connection_base.rb @@ -169,6 +169,7 @@ def deliver(message, context, options = {}) raise Error::LintError, "Trying to deliver a message over a disconnected connection (to #{address})" end buffer = serialize(message, context) + context&.tracer&.add_event(context&.current_span, 'Message Ready') check_timeout!(context) ensure_connected do |socket| operation_id = Monitoring.next_operation_id @@ -207,6 +208,7 @@ def deliver(message, context, options = {}) server_connection_id: description.server_connection_id, service_id: description.service_id, ) + context&.tracer&.add_event(context&.current_span, 'Response Received') end if result && context.decrypt? result = result.maybe_decrypt(context) diff --git a/spec/mongo/monitoring/open_telemetry/statement_builder_spec.rb b/spec/mongo/monitoring/open_telemetry/statement_builder_spec.rb new file mode 100644 index 0000000000..74b3b15a69 --- /dev/null +++ b/spec/mongo/monitoring/open_telemetry/statement_builder_spec.rb @@ -0,0 +1,110 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Mongo::Monitoring::OpenTelemetry::StatementBuilder do + shared_examples 'statement builder tests' do |obfuscate| + subject(:statement_builder) { described_class.new(command, obfuscate) } + + context 'with a find command' do + let(:command) { { 'find' => 'users', 'filter' => { 'name' => 'John' }, '$db' => 'test', 'lsid' => 'some_id' } } + + it 'ignores specified keys' do + statement = JSON.parse(statement_builder.build) + Mongo::Protocol::Msg::INTERNAL_KEYS.each do |key| + expect(statement).not_to have_key(key) + end + end + + it 'returns the correct statement' do + expected_filter = obfuscate ? { 'name' => '?' } : { 'name' => 'John' } + expected_statement = { 'find' => 'users', 'filter' => expected_filter }.to_json.freeze + expect(statement_builder.build).to eq(expected_statement) + end + end + + context 'with an aggregation pipeline' do + let(:pipeline) do + [ { '$match' => { 'status' => 'A' } }, + { '$group' => { '_id' => '$cust_id', 'total' => { '$sum' => '$amount' } } } ] + end + let(:command) { { 'aggregate' => 'orders', 'pipeline' => pipeline, '$db' => 'test' } } + + let(:expected_pipeline) do + if obfuscate + [ { '$match' => { 'status' => '?' } }, + { '$group' => { '_id' => '?', 'total' => { '$sum' => '?' } } } ] + else + pipeline + end + end + + it 'returns the correct statement' do + expected_statement = { 'aggregate' => 'orders', 'pipeline' => expected_pipeline }.to_json.freeze + expect(statement_builder.build).to eq(expected_statement) + end + end + + context 'with a findAndModify command' do + let(:command) do + { 'findAndModify' => 'users', 'query' => { 'name' => 'John' }, 'update' => { '$set' => { 'age' => 30 } }, + '$db' => 'test' } + end + + it 'returns the correct statement' do + expected_query = obfuscate ? { 'name' => '?' } : { 'name' => 'John' } + expected_update = obfuscate ? { '$set' => { 'age' => '?' } } : { '$set' => { 'age' => 30 } } + expected_statement = { 'findAndModify' => 'users', 'query' => expected_query, + 'update' => expected_update }.to_json.freeze + expect(statement_builder.build).to eq(expected_statement) + end + end + + context 'with an update command' do + let(:command) do + { 'update' => 'users', 'updates' => [ { 'q' => { 'name' => 'John' }, 'u' => { '$set' => { 'age' => 30 } } } ], + '$db' => 'test' } + end + + it 'returns the correct statement' do + expected_updates = if obfuscate + [ { 'q' => { 'name' => '?' }, + 'u' => { '$set' => { 'age' => '?' } } } ] + else + [ { 'q' => { 'name' => 'John' }, + 'u' => { '$set' => { 'age' => 30 } } } ] + end + expected_statement = { 'update' => 'users', 'updates' => expected_updates }.to_json.freeze + expect(statement_builder.build).to eq(expected_statement) + end + end + + context 'with a delete command' do + let(:command) do + { 'delete' => 'users', 'deletes' => [ { 'q' => { 'name' => 'John' }, 'limit' => 1 } ], '$db' => 'test' } + end + + let(:expected_deletes) do + if obfuscate + [ { 'q' => { 'name' => '?' }, + 'limit' => '?' } ] + else + [ { 'q' => { 'name' => 'John' }, 'limit' => 1 } ] + end + end + + it 'returns the correct statement' do + expected_statement = { 'delete' => 'users', 'deletes' => expected_deletes }.to_json.freeze + expect(statement_builder.build).to eq(expected_statement) + end + end + end + + context 'when obfuscation is false' do + include_examples 'statement builder tests', false + end + + context 'when obfuscation is true' do + include_examples 'statement builder tests', true + end +end diff --git a/spec/mongo/monitoring/open_telemetry/tracer_spec.rb b/spec/mongo/monitoring/open_telemetry/tracer_spec.rb new file mode 100644 index 0000000000..cf9b08bb5e --- /dev/null +++ b/spec/mongo/monitoring/open_telemetry/tracer_spec.rb @@ -0,0 +1,44 @@ +require 'opentelemetry/sdk' +require 'lite_spec_helper' + +RSpec.describe Mongo::Monitoring::OpenTelemetry::Tracer do + let(:tracer) { described_class.new } + + describe '#initialize' do + context 'when not enabled' do + before do + allow(ENV).to receive(:[]).with(described_class::ENV_VARIABLE_ENABLED).and_return(nil) + end + + it 'does not create a tracer' do + expect(tracer.ot_tracer).to be_nil + end + end + + context 'when enabled' do + before do + allow(ENV).to receive(:[]).with(described_class::ENV_VARIABLE_ENABLED).and_return('true') + end + + it 'creates the tracer' do + expect(tracer.ot_tracer).to be_a(OpenTelemetry::SDK::Trace::Tracer) + end + end + end + + describe '#in_span' do + context 'when not enabled' do + it 'yields to the block' do + expect { |b| tracer.in_span(nil, nil, nil, &b) }.to yield_control + end + end + + context 'when enabled' do + let(:ot_tracer) { instance_double(OpenTelemetry::SDK::Trace::Tracer) } + + before do + allow(ENV).to receive(:[]).with(described_class::ENV_VARIABLE_ENABLED).and_return('true') + end + end + end +end