diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index 34194899e1..e6fe83a502 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -24,14 +24,13 @@ NoReturn, Optional, Sequence, - Union, ) from bson import CodecOptions, _convert_raw_document_lists_to_streams from pymongo.asynchronous.cursor_base import _AsyncCursorBase, _ConnectionManager from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure -from pymongo.message import _GetMore, _OpMsg, _OpReply, _RawBatchGetMore +from pymongo.message import _GetMore, _OpMsg, _RawBatchGetMore from pymongo.response import PinnedResponse from pymongo.typings import _Address, _DocumentOut, _DocumentType @@ -145,7 +144,7 @@ async def _maybe_pin_connection(self, conn: AsyncConnection) -> None: def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[Mapping[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, @@ -328,7 +327,7 @@ def __init__( def _unpack_response( # type: ignore[override] self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[dict[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index f7c1671777..4e3adc1809 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -48,7 +48,6 @@ from pymongo.message import ( _GetMore, _OpMsg, - _OpReply, _Query, _RawBatchGetMore, _RawBatchQuery, @@ -864,7 +863,7 @@ def collation(self, collation: Optional[_CollationIn]) -> AsyncCursor[_DocumentT def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions, # type: ignore[type-arg] user_fields: Optional[Mapping[str, Any]] = None, @@ -1189,7 +1188,7 @@ def __init__( def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[Mapping[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index 3e73492efa..7d1067371e 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -32,7 +32,7 @@ from pymongo import _csot, helpers_shared, message from pymongo._telemetry import _CommandTelemetry from pymongo.compression_support import _NO_COMPRESSION -from pymongo.message import _OpMsg, _OpReply +from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate if TYPE_CHECKING: @@ -75,7 +75,7 @@ async def _network_command_core( cursor_id: Optional[int] = None, orig: Optional[MutableMapping[str, Any]] = None, speculative_hello: bool = False, -) -> tuple[list[_DocumentOut], Optional[Union[_OpReply, _OpMsg]], datetime.timedelta]: +) -> tuple[list[_DocumentOut], Optional[_OpMsg], datetime.timedelta]: """Send/receive a command and return (docs, raw_reply, duration). Handles APM logging, send/receive, unpacking, response processing, @@ -84,7 +84,7 @@ async def _network_command_core( """ publish = listeners is not None and listeners.enabled_for_commands name = next(iter(spec)) - reply: Optional[Union[_OpReply, _OpMsg]] = None + reply: Optional[_OpMsg] = None docs: list[_DocumentOut] = [] with _CommandTelemetry(client, conn, spec, dbname, request_id, start) as cmd_telemetry: @@ -191,7 +191,7 @@ async def command( conn: AsyncConnection, dbname: str, spec: MutableMapping[str, Any], - is_mongos: bool, + is_mongos: bool, # noqa: ARG001 read_preference: Optional[_ServerMode], codec_options: CodecOptions[_DocumentType], session: Optional[AsyncClientSession], @@ -205,7 +205,7 @@ async def command( parse_write_concern_error: bool = False, collation: Optional[_CollationIn] = None, compression_ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None, - use_op_msg: bool = False, + use_op_msg: bool = False, # noqa: ARG001 unacknowledged: bool = False, user_fields: Optional[Mapping[str, Any]] = None, exhaust_allowed: bool = False, @@ -239,14 +239,10 @@ async def command( :param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed. """ name = next(iter(spec)) - ns = dbname + ".$cmd" speculative_hello = False # Publish the original command document, perhaps with lsid and $clusterTime. orig = spec - if is_mongos and not use_op_msg: - assert read_preference is not None - spec = message._maybe_add_read_preference(spec, read_preference) if read_concern and not (session and session.in_transaction): if read_concern.level: spec["readConcern"] = read_concern.document @@ -271,21 +267,15 @@ async def command( conn.apply_timeout(client, spec) _csot.apply_write_concern(spec, write_concern) - if use_op_msg: - flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 - flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 - request_id, msg, size, max_doc_size = message._op_msg( - flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx - ) - # If this is an unacknowledged write then make sure the encoded doc(s) - # are small enough, otherwise rely on the server to return an error. - if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: - message._raise_document_too_large(name, size, max_bson_size) - else: - request_id, msg, size = message._query( - 0, ns, 0, -1, spec, None, codec_options, compression_ctx - ) - max_doc_size = 0 + flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 + flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 + request_id, msg, size, max_doc_size = message._op_msg( + flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx + ) + # If this is an unacknowledged write then make sure the encoded doc(s) + # are small enough, otherwise rely on the server to return an error. + if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: + message._raise_document_too_large(name, size, max_bson_size) if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a5d5b28990..4bbc7045b3 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -102,7 +102,7 @@ ZlibContext, ZstdContext, ) - from pymongo.message import _OpMsg, _OpReply + from pymongo.message import _OpMsg from pymongo.read_concern import ReadConcern from pymongo.read_preferences import _ServerMode from pymongo.typings import _Address, _CollationIn @@ -235,13 +235,12 @@ async def unpin(self) -> None: await self.close_conn(ConnectionClosedReason.STALE) def hello_cmd(self) -> dict[str, Any]: - # Handshake spec requires us to use OP_MSG+hello command for the - # initial handshake in load balanced or stable API mode. + # As of PYTHON-5713, always use OP_MSG for the handshake since all + # supported servers (MongoDB 4.2+, wire version >= 8) support it. + self.op_msg_enabled = True if self.opts.server_api or self.hello_ok or self.opts.load_balanced: - self.op_msg_enabled = True return {HelloCompat.CMD: 1} - else: - return {HelloCompat.LEGACY_CMD: 1, "helloOk": True} + return {HelloCompat.LEGACY_CMD: 1, "helloOk": True} async def hello(self) -> Hello[dict[str, Any]]: return await self._hello(None, None) @@ -447,7 +446,7 @@ async def send_message(self, message: bytes, max_doc_size: int) -> None: except BaseException as error: await self._raise_connection_failure(error) - async def receive_message(self, request_id: Optional[int]) -> Union[_OpReply, _OpMsg]: + async def receive_message(self, request_id: Optional[int]) -> _OpMsg: """Receive a raw BSON message or raise ConnectionFailure. If any exception is raised, the socket is closed. diff --git a/pymongo/message.py b/pymongo/message.py index b0d1ceb105..fdac2b4daa 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -39,14 +39,12 @@ import bson from bson import CodecOptions, _dict_to_bson, _make_c_string -from bson.int64 import Int64 from bson.raw_bson import ( _RAW_ARRAY_BSON_OPTIONS, DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument, _inflate_bson, ) -from pymongo.hello import HelloCompat from pymongo.monitoring import _EventListeners try: @@ -57,12 +55,8 @@ _use_c = False from pymongo.errors import ( ConfigurationError, - CursorNotFound, DocumentTooLarge, - ExecutionTimeout, InvalidOperation, - NotPrimaryError, - OperationFailure, ProtocolError, ) from pymongo.read_preferences import ReadPreference, _ServerMode @@ -423,96 +417,6 @@ def _op_msg( command[identifier] = docs -def _query_impl( - options: int, - collection_name: str, - num_to_skip: int, - num_to_return: int, - query: Mapping[str, Any], - field_selector: Optional[Mapping[str, Any]], - opts: CodecOptions[Any], -) -> tuple[bytes, int]: - """Get an OP_QUERY message.""" - encoded = _dict_to_bson(query, False, opts) - if field_selector: - efs = _dict_to_bson(field_selector, False, opts) - else: - efs = b"" - max_bson_size = max(len(encoded), len(efs)) - return ( - b"".join( - [ - _pack_int(options), - bson._make_c_string(collection_name), - _pack_int(num_to_skip), - _pack_int(num_to_return), - encoded, - efs, - ] - ), - max_bson_size, - ) - - -def _query_compressed( - options: int, - collection_name: str, - num_to_skip: int, - num_to_return: int, - query: Mapping[str, Any], - field_selector: Optional[Mapping[str, Any]], - opts: CodecOptions[Any], - ctx: Union[SnappyContext, ZlibContext, ZstdContext], -) -> tuple[int, bytes, int]: - """Internal compressed query message helper.""" - op_query, max_bson_size = _query_impl( - options, collection_name, num_to_skip, num_to_return, query, field_selector, opts - ) - rid, msg = _compress(2004, op_query, ctx) - return rid, msg, max_bson_size - - -def _query_uncompressed( - options: int, - collection_name: str, - num_to_skip: int, - num_to_return: int, - query: Mapping[str, Any], - field_selector: Optional[Mapping[str, Any]], - opts: CodecOptions[Any], -) -> tuple[int, bytes, int]: - """Internal query message helper.""" - op_query, max_bson_size = _query_impl( - options, collection_name, num_to_skip, num_to_return, query, field_selector, opts - ) - rid, msg = __pack_message(2004, op_query) - return rid, msg, max_bson_size - - -if _use_c: - _query_uncompressed = _cmessage._query_message - - -def _query( - options: int, - collection_name: str, - num_to_skip: int, - num_to_return: int, - query: Mapping[str, Any], - field_selector: Optional[Mapping[str, Any]], - opts: CodecOptions[Any], - ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None, -) -> tuple[int, bytes, int]: - """Get a **query** message.""" - if ctx: - return _query_compressed( - options, collection_name, num_to_skip, num_to_return, query, field_selector, opts, ctx - ) - return _query_uncompressed( - options, collection_name, num_to_skip, num_to_return, query, field_selector, opts - ) - - _pack_long_long = struct.Struct(" list[bytes | memoryview]: - """Check the response header from the database, without decoding BSON. - - Check the response for errors and unpack. - - Can raise CursorNotFound, NotPrimaryError, ExecutionTimeout, or - OperationFailure. - - :param cursor_id: cursor_id we sent to get this response - - used for raising an informative exception when we get cursor id not - valid at server response. - """ - if self.flags & 1: - # Shouldn't get this response if we aren't doing a getMore - if cursor_id is None: - raise ProtocolError("No cursor id for getMore operation") - - # Fake a getMore command response. OP_GET_MORE provides no - # document. - msg = "Cursor not found, cursor id: %d" % (cursor_id,) - errobj = {"ok": 0, "errmsg": msg, "code": 43} - raise CursorNotFound(msg, 43, errobj) - elif self.flags & 2: - error_object: dict[str, Any] = bson.BSON(self.documents).decode() - # Fake the ok field if it doesn't exist. - error_object.setdefault("ok", 0) - if error_object["$err"].startswith(HelloCompat.LEGACY_ERROR): - raise NotPrimaryError(error_object["$err"], error_object) - elif error_object.get("code") == 50: - default_msg = "operation exceeded time limit" - raise ExecutionTimeout( - error_object.get("$err", default_msg), error_object.get("code"), error_object - ) - raise OperationFailure( - "database error: %s" % error_object.get("$err"), - error_object.get("code"), - error_object, - ) - if self.documents: - return [self.documents] - return [] - - def unpack_response( - self, - cursor_id: Optional[int] = None, - codec_options: CodecOptions[Any] = _UNICODE_REPLACE_CODEC_OPTIONS, - user_fields: Optional[Mapping[str, Any]] = None, - legacy_response: bool = False, - ) -> list[dict[str, Any]]: - """Unpack a response from the database and decode the BSON document(s). - - Check the response for errors and unpack, returning a dictionary - containing the response data. - - Can raise CursorNotFound, NotPrimaryError, ExecutionTimeout, or - OperationFailure. - - :param cursor_id: cursor_id we sent to get this response - - used for raising an informative exception when we get cursor id not - valid at server response - :param codec_options: an instance of - :class:`~bson.codec_options.CodecOptions` - :param user_fields: Response fields that should be decoded - using the TypeDecoders from codec_options, passed to - bson._decode_all_selective. - """ - self.raw_response(cursor_id) - if legacy_response: - return bson.decode_all(self.documents, codec_options) - return bson._decode_all_selective(self.documents, codec_options, user_fields) - - def command_response(self, codec_options: CodecOptions[Any]) -> dict[str, Any]: - """Unpack a command response.""" - docs = self.unpack_response(codec_options=codec_options) - assert self.number_returned == 1 - return docs[0] - - def raw_command_response(self) -> NoReturn: - """Return the bytes of the command response.""" - # This should never be called on _OpReply. - raise NotImplementedError - - @property - def more_to_come(self) -> bool: - """Is the moreToCome bit set on this response?""" - return False - - @classmethod - def unpack(cls, msg: bytes | memoryview) -> _OpReply: - """Construct an _OpReply from raw bytes.""" - # PYTHON-945: ignore starting_from field. - flags, cursor_id, _, number_returned = cls.UNPACK_FROM(msg) - - documents = msg[20:] - return cls(flags, cursor_id, number_returned, documents) - - class _OpMsg: """A MongoDB OP_MSG response message.""" @@ -1541,8 +1330,7 @@ def unpack(cls, msg: bytes | memoryview) -> _OpMsg: return cls(flags, payload_document) -_UNPACK_REPLY: dict[int, Callable[[bytes | memoryview], Union[_OpReply, _OpMsg]]] = { - _OpReply.OP_CODE: _OpReply.unpack, +_UNPACK_REPLY: dict[int, Callable[[bytes | memoryview], _OpMsg]] = { _OpMsg.OP_CODE: _OpMsg.unpack, } @@ -1680,54 +1468,20 @@ def as_command( def get_message( self, read_preference: _ServerMode, conn: _AgnosticConnection, use_cmd: bool = False ) -> tuple[int, bytes, int]: - """Get a query message, possibly setting the secondaryOk bit.""" + """Get a query message""" # Use the read_preference decided by _socket_from_server. self.read_preference = read_preference - if read_preference.mode: - # Set the secondaryOk bit. - flags = self.flags | 4 - else: - flags = self.flags - - ns = self.namespace() - spec = self.spec - if use_cmd: - spec = self.as_command(conn)[0] - request_id, msg, size, _ = _op_msg( - 0, - spec, - self.db, - read_preference, - self.codec_options, - ctx=conn.compression_context, - ) - return request_id, msg, size - - # OP_QUERY treats ntoreturn of -1 and 1 the same, return - # one document and close the cursor. We have to use 2 for - # batch size if 1 is specified. - ntoreturn = self.batch_size == 1 and 2 or self.batch_size - if self.limit: - if ntoreturn: - ntoreturn = min(self.limit, ntoreturn) - else: - ntoreturn = self.limit - - if conn.is_mongos: - assert isinstance(spec, MutableMapping) - spec = _maybe_add_read_preference(spec, read_preference) - - return _query( - flags, - ns, - self.ntoskip, - ntoreturn, + spec = self.as_command(conn)[0] + request_id, msg, size, _ = _op_msg( + 0, spec, - None if use_cmd else self.fields, + self.db, + read_preference, self.codec_options, ctx=conn.compression_context, ) + return request_id, msg, size class _GetMore: diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 7c62a251f8..f4e2d2669b 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -35,7 +35,7 @@ from pymongo.common import MAX_MESSAGE_SIZE from pymongo.compression_support import decompress from pymongo.errors import ProtocolError, _OperationCancelled -from pymongo.message import _UNPACK_REPLY, _OpMsg, _OpReply +from pymongo.message import _UNPACK_REPLY, _OpMsg from pymongo.socket_checker import _errno_from_exception try: @@ -696,7 +696,7 @@ async def async_receive_message( conn: AsyncConnection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE, -) -> Union[_OpReply, _OpMsg]: +) -> _OpMsg: """Receive a raw BSON message or raise socket.error.""" timeout: Optional[Union[float, int]] timeout = conn.conn.gettimeout @@ -745,7 +745,7 @@ async def async_receive_message( def receive_message( conn: Connection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE -) -> Union[_OpReply, _OpMsg]: +) -> _OpMsg: """Receive a raw BSON message or raise socket.error.""" if _csot.get_timeout(): deadline = _csot.get_deadline() diff --git a/pymongo/response.py b/pymongo/response.py index 211ddf2354..1bd2411e14 100644 --- a/pymongo/response.py +++ b/pymongo/response.py @@ -15,12 +15,12 @@ """Represent a response from the server.""" from __future__ import annotations -from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Union +from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence if TYPE_CHECKING: from datetime import timedelta - from pymongo.message import _OpMsg, _OpReply + from pymongo.message import _OpMsg from pymongo.typings import _Address, _AgnosticConnection, _DocumentOut @@ -29,7 +29,7 @@ class Response: def __init__( self, - data: Union[_OpMsg, _OpReply], + data: _OpMsg, address: _Address, request_id: int, duration: Optional[timedelta], @@ -52,7 +52,7 @@ def __init__( self._docs = docs @property - def data(self) -> Union[_OpMsg, _OpReply]: + def data(self) -> _OpMsg: """Server response's raw BSON bytes.""" return self._data @@ -87,7 +87,7 @@ class PinnedResponse(Response): def __init__( self, - data: Union[_OpMsg, _OpReply], + data: _OpMsg, address: _Address, conn: _AgnosticConnection, request_id: int, diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index b2023ad5de..ba21bdc33e 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -24,13 +24,12 @@ NoReturn, Optional, Sequence, - Union, ) from bson import CodecOptions, _convert_raw_document_lists_to_streams from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure -from pymongo.message import _GetMore, _OpMsg, _OpReply, _RawBatchGetMore +from pymongo.message import _GetMore, _OpMsg, _RawBatchGetMore from pymongo.response import PinnedResponse from pymongo.synchronous.cursor_base import _ConnectionManager, _CursorBase from pymongo.typings import _Address, _DocumentOut, _DocumentType @@ -145,7 +144,7 @@ def _maybe_pin_connection(self, conn: Connection) -> None: def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[Mapping[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, @@ -328,7 +327,7 @@ def __init__( def _unpack_response( # type: ignore[override] self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[dict[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index 909671dac5..8085d58669 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -46,7 +46,6 @@ from pymongo.message import ( _GetMore, _OpMsg, - _OpReply, _Query, _RawBatchGetMore, _RawBatchQuery, @@ -862,7 +861,7 @@ def collation(self, collation: Optional[_CollationIn]) -> Cursor[_DocumentType]: def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions, # type: ignore[type-arg] user_fields: Optional[Mapping[str, Any]] = None, @@ -1185,7 +1184,7 @@ def __init__(self, collection: Collection[_DocumentType], *args: Any, **kwargs: def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[Mapping[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index a7def8d89a..233a05cf84 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -32,7 +32,7 @@ from pymongo import _csot, helpers_shared, message from pymongo._telemetry import _CommandTelemetry from pymongo.compression_support import _NO_COMPRESSION -from pymongo.message import _OpMsg, _OpReply +from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate if TYPE_CHECKING: @@ -75,7 +75,7 @@ def _network_command_core( cursor_id: Optional[int] = None, orig: Optional[MutableMapping[str, Any]] = None, speculative_hello: bool = False, -) -> tuple[list[_DocumentOut], Optional[Union[_OpReply, _OpMsg]], datetime.timedelta]: +) -> tuple[list[_DocumentOut], Optional[_OpMsg], datetime.timedelta]: """Send/receive a command and return (docs, raw_reply, duration). Handles APM logging, send/receive, unpacking, response processing, @@ -84,7 +84,7 @@ def _network_command_core( """ publish = listeners is not None and listeners.enabled_for_commands name = next(iter(spec)) - reply: Optional[Union[_OpReply, _OpMsg]] = None + reply: Optional[_OpMsg] = None docs: list[_DocumentOut] = [] with _CommandTelemetry(client, conn, spec, dbname, request_id, start) as cmd_telemetry: @@ -191,7 +191,7 @@ def command( conn: Connection, dbname: str, spec: MutableMapping[str, Any], - is_mongos: bool, + is_mongos: bool, # noqa: ARG001 read_preference: Optional[_ServerMode], codec_options: CodecOptions[_DocumentType], session: Optional[ClientSession], @@ -205,7 +205,7 @@ def command( parse_write_concern_error: bool = False, collation: Optional[_CollationIn] = None, compression_ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None, - use_op_msg: bool = False, + use_op_msg: bool = False, # noqa: ARG001 unacknowledged: bool = False, user_fields: Optional[Mapping[str, Any]] = None, exhaust_allowed: bool = False, @@ -239,14 +239,10 @@ def command( :param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed. """ name = next(iter(spec)) - ns = dbname + ".$cmd" speculative_hello = False # Publish the original command document, perhaps with lsid and $clusterTime. orig = spec - if is_mongos and not use_op_msg: - assert read_preference is not None - spec = message._maybe_add_read_preference(spec, read_preference) if read_concern and not (session and session.in_transaction): if read_concern.level: spec["readConcern"] = read_concern.document @@ -271,21 +267,15 @@ def command( conn.apply_timeout(client, spec) _csot.apply_write_concern(spec, write_concern) - if use_op_msg: - flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 - flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 - request_id, msg, size, max_doc_size = message._op_msg( - flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx - ) - # If this is an unacknowledged write then make sure the encoded doc(s) - # are small enough, otherwise rely on the server to return an error. - if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: - message._raise_document_too_large(name, size, max_bson_size) - else: - request_id, msg, size = message._query( - 0, ns, 0, -1, spec, None, codec_options, compression_ctx - ) - max_doc_size = 0 + flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 + flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 + request_id, msg, size, max_doc_size = message._op_msg( + flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx + ) + # If this is an unacknowledged write then make sure the encoded doc(s) + # are small enough, otherwise rely on the server to return an error. + if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: + message._raise_document_too_large(name, size, max_bson_size) if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 25f2d08fe7..e3875a5b99 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -99,7 +99,7 @@ ZlibContext, ZstdContext, ) - from pymongo.message import _OpMsg, _OpReply + from pymongo.message import _OpMsg from pymongo.read_concern import ReadConcern from pymongo.read_preferences import _ServerMode from pymongo.synchronous.auth import _AuthContext @@ -235,13 +235,12 @@ def unpin(self) -> None: self.close_conn(ConnectionClosedReason.STALE) def hello_cmd(self) -> dict[str, Any]: - # Handshake spec requires us to use OP_MSG+hello command for the - # initial handshake in load balanced or stable API mode. + # As of PYTHON-5713, always use OP_MSG for the handshake since all + # supported servers (MongoDB 4.2+, wire version >= 8) support it. + self.op_msg_enabled = True if self.opts.server_api or self.hello_ok or self.opts.load_balanced: - self.op_msg_enabled = True return {HelloCompat.CMD: 1} - else: - return {HelloCompat.LEGACY_CMD: 1, "helloOk": True} + return {HelloCompat.LEGACY_CMD: 1, "helloOk": True} def hello(self) -> Hello[dict[str, Any]]: return self._hello(None, None) @@ -447,7 +446,7 @@ def send_message(self, message: bytes, max_doc_size: int) -> None: except BaseException as error: self._raise_connection_failure(error) - def receive_message(self, request_id: Optional[int]) -> Union[_OpReply, _OpMsg]: + def receive_message(self, request_id: Optional[int]) -> _OpMsg: """Receive a raw BSON message or raise ConnectionFailure. If any exception is raised, the socket is closed. diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index dea1161afa..5902e957a2 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -2305,10 +2305,10 @@ async def receive_message(request_id): # Discard the actual server response. await AsyncConnection.receive_message(conn, request_id) - # responseFlags bit 1 is QueryFailure. - msg = struct.pack(" bool: "test_gridfs.py", "test_gridfs_bucket.py", "test_gridfs_spec.py", + "test_handshake_unified.py", "test_heartbeat_monitoring.py", "test_index_management.py", "test_json_util_integration.py",