From ebb7b1fdae053ec649adb6d89ac3f28c57d903ab Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Thu, 7 May 2026 14:22:54 -0700 Subject: [PATCH 01/10] remove op_query --- pymongo/asynchronous/pool.py | 24 ++++++++++++++++----- pymongo/synchronous/pool.py | 24 ++++++++++++++++----- test/mockupdb/test_handshake.py | 38 +++++++++++++++++++++++++-------- 3 files changed, 67 insertions(+), 19 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a5d5b28990..12412da641 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -46,6 +46,8 @@ MAX_MESSAGE_SIZE, MAX_WIRE_VERSION, MAX_WRITE_BATCH_SIZE, + MIN_SUPPORTED_SERVER_VERSION, + MIN_SUPPORTED_WIRE_VERSION, ORDERED_TYPES, ) from pymongo.errors import ( # type:ignore[attr-defined] @@ -235,13 +237,12 @@ async def unpin(self) -> None: await self.close_conn(ConnectionClosedReason.STALE) def hello_cmd(self) -> dict[str, Any]: - # Handshake spec requires us to use OP_MSG+hello command for the - # initial handshake in load balanced or stable API mode. + # As of PYTHON-5713, always use OP_MSG for the handshake since all + # supported servers (MongoDB 4.2+, wire version >= 8) support it. + self.op_msg_enabled = True if self.opts.server_api or self.hello_ok or self.opts.load_balanced: - self.op_msg_enabled = True return {HelloCompat.CMD: 1} - else: - return {HelloCompat.LEGACY_CMD: 1, "helloOk": True} + return {HelloCompat.LEGACY_CMD: 1, "helloOk": True} async def hello(self) -> Hello[dict[str, Any]]: return await self._hello(None, None) @@ -291,6 +292,19 @@ async def _hello( if performing_handshake: self.connect_rtt = time.monotonic() - start hello = Hello(doc, awaitable=awaitable) + # OP_MSG requires wire version 6+. + if hello.max_wire_version < 6: + raise ConfigurationError( + "Server at %s:%d reports wire version %d, but this version of " + "PyMongo requires at least %d (MongoDB %s)." + % ( + self.address[0], + self.address[1] or 0, + hello.max_wire_version, + MIN_SUPPORTED_WIRE_VERSION, + MIN_SUPPORTED_SERVER_VERSION, + ) + ) self.is_writable = hello.is_writable self.max_wire_version = hello.max_wire_version self.max_bson_size = hello.max_bson_size diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 25f2d08fe7..ca389df3d9 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -43,6 +43,8 @@ MAX_MESSAGE_SIZE, MAX_WIRE_VERSION, MAX_WRITE_BATCH_SIZE, + MIN_SUPPORTED_SERVER_VERSION, + MIN_SUPPORTED_WIRE_VERSION, ORDERED_TYPES, ) from pymongo.errors import ( # type:ignore[attr-defined] @@ -235,13 +237,12 @@ def unpin(self) -> None: self.close_conn(ConnectionClosedReason.STALE) def hello_cmd(self) -> dict[str, Any]: - # Handshake spec requires us to use OP_MSG+hello command for the - # initial handshake in load balanced or stable API mode. + # As of PYTHON-5713, always use OP_MSG for the handshake since all + # supported servers (MongoDB 4.2+, wire version >= 8) support it. + self.op_msg_enabled = True if self.opts.server_api or self.hello_ok or self.opts.load_balanced: - self.op_msg_enabled = True return {HelloCompat.CMD: 1} - else: - return {HelloCompat.LEGACY_CMD: 1, "helloOk": True} + return {HelloCompat.LEGACY_CMD: 1, "helloOk": True} def hello(self) -> Hello[dict[str, Any]]: return self._hello(None, None) @@ -291,6 +292,19 @@ def _hello( if performing_handshake: self.connect_rtt = time.monotonic() - start hello = Hello(doc, awaitable=awaitable) + # OP_MSG requires wire version 6+. + if hello.max_wire_version < 6: + raise ConfigurationError( + "Server at %s:%d reports wire version %d, but this version of " + "PyMongo requires at least %d (MongoDB %s)." + % ( + self.address[0], + self.address[1] or 0, + hello.max_wire_version, + MIN_SUPPORTED_WIRE_VERSION, + MIN_SUPPORTED_SERVER_VERSION, + ) + ) self.is_writable = hello.is_writable self.max_wire_version = hello.max_wire_version self.max_bson_size = hello.max_bson_size diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index c2c978c4ad..42803c8a83 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -13,12 +13,13 @@ # limitations under the License. from __future__ import annotations +import re import unittest import pytest try: - from mockupdb import Command, MockupDB, OpMsg, OpMsgReply, OpQuery, OpReply, absent, go + from mockupdb import Command, MockupDB, OpMsg, OpMsgReply, OpReply, absent, go _HAVE_MOCKUPDB = True except ImportError: @@ -28,8 +29,8 @@ from bson.objectid import ObjectId from pymongo import MongoClient, has_c from pymongo import version as pymongo_version -from pymongo.common import MIN_SUPPORTED_WIRE_VERSION -from pymongo.errors import OperationFailure +from pymongo.common import MIN_SUPPORTED_SERVER_VERSION, MIN_SUPPORTED_WIRE_VERSION +from pymongo.errors import ConfigurationError, OperationFailure, ServerSelectionTimeoutError from pymongo.server_api import ServerApi, ServerApiVersion pytestmark = pytest.mark.mockupdb @@ -53,7 +54,7 @@ def _check_handshake_data(request): class TestHandshake(unittest.TestCase): def hello_with_option_helper(self, protocol, **kwargs): - hello = "ismaster" if isinstance(protocol(), OpQuery) else "hello" + hello = "hello" if ("apiVersion" in kwargs or "loadBalanced" in kwargs) else "ismaster" # `db.command("hello"|"ismaster")` commands are the same for primaries and # secondaries, so we only need one server. primary = MockupDB() @@ -165,7 +166,7 @@ def test_client_handshake_data(self): future = go(client.db.command, "whatever") for request in primary: - if request.matches(Command("ismaster")): + if request.matches("ismaster"): if request.client_port == heartbeat.client_port: # This is the monitor again, keep going. request.ok(primary_response) @@ -242,11 +243,10 @@ def test_handshake_versioned_api(self): self.hello_with_option_helper(Command, apiVersion="1") def test_handshake_not_either(self): - # If we don't specify either option then it should be using - # OP_QUERY for the initial step of the handshake. - self.hello_with_option_helper(Command) + # As of PYTHON-5713, always use OP_MSG for the initial handshake. + self.hello_with_option_helper(OpMsg) with self.assertRaisesRegex(AssertionError, "does not match"): - self.hello_with_option_helper(OpMsg) + self.hello_with_option_helper(Command) def test_handshake_max_wire(self): server = MockupDB() @@ -292,6 +292,26 @@ def responder(request): self.found_auth_msg, "Could not find authentication command with correct protocol" ) + def test_handshake_op_msg_not_supported(self): + # If a server responds with maxWireVersion < 6 (no OP_MSG support), + # the wire version error must surface to the user. + server = MockupDB() + server.autoresponds("ismaster", ok=1, ismaster=True, minWireVersion=0, maxWireVersion=5) + server.run() + self.addCleanup(server.stop) + + client = MongoClient(server.uri, serverSelectionTimeoutMS=500) + self.addCleanup(client.close) + + # The ConfigurationError from _hello() is stored as the server's error + # and surfaces inside ServerSelectionTimeoutError. + expected = re.escape( + "reports wire version 5, but this version of PyMongo requires at least " + "%d (MongoDB %s)." % (MIN_SUPPORTED_WIRE_VERSION, MIN_SUPPORTED_SERVER_VERSION) + ) + with self.assertRaisesRegex(ServerSelectionTimeoutError, expected): + client.db.command("ping") + if __name__ == "__main__": unittest.main() From 16868668202ed695cc9c066332ab68413d402629 Mon Sep 17 00:00:00 2001 From: Iris Date: Mon, 1 Jun 2026 16:29:10 -0700 Subject: [PATCH 02/10] address JY review comments on spec PR --- pymongo/asynchronous/pool.py | 25 ++++++++++--------------- pymongo/synchronous/pool.py | 25 ++++++++++--------------- test/mockupdb/test_handshake.py | 27 +++++++++++++++++---------- 3 files changed, 37 insertions(+), 40 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 12412da641..5a56cd7523 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -47,7 +47,6 @@ MAX_WIRE_VERSION, MAX_WRITE_BATCH_SIZE, MIN_SUPPORTED_SERVER_VERSION, - MIN_SUPPORTED_WIRE_VERSION, ORDERED_TYPES, ) from pymongo.errors import ( # type:ignore[attr-defined] @@ -288,23 +287,19 @@ async def _hello( if performing_handshake: start = time.monotonic() - doc = await self.command("admin", cmd, publish_events=False, exhaust_allowed=awaitable) + try: + doc = self.command("admin", cmd, publish_events=False, exhaust_allowed=awaitable) + except AutoReconnect as exc: + if performing_handshake: + raise ConfigurationError( + f"The server may have closed the connection because it does not " + f"support the wire protocol version used in the initial handshake. " + f"Ensure your MongoDB server version is {MIN_SUPPORTED_SERVER_VERSION} or newer." + ) from exc + raise if performing_handshake: self.connect_rtt = time.monotonic() - start hello = Hello(doc, awaitable=awaitable) - # OP_MSG requires wire version 6+. - if hello.max_wire_version < 6: - raise ConfigurationError( - "Server at %s:%d reports wire version %d, but this version of " - "PyMongo requires at least %d (MongoDB %s)." - % ( - self.address[0], - self.address[1] or 0, - hello.max_wire_version, - MIN_SUPPORTED_WIRE_VERSION, - MIN_SUPPORTED_SERVER_VERSION, - ) - ) self.is_writable = hello.is_writable self.max_wire_version = hello.max_wire_version self.max_bson_size = hello.max_bson_size diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index ca389df3d9..6bc2385edd 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -44,7 +44,6 @@ MAX_WIRE_VERSION, MAX_WRITE_BATCH_SIZE, MIN_SUPPORTED_SERVER_VERSION, - MIN_SUPPORTED_WIRE_VERSION, ORDERED_TYPES, ) from pymongo.errors import ( # type:ignore[attr-defined] @@ -288,23 +287,19 @@ def _hello( if performing_handshake: start = time.monotonic() - doc = self.command("admin", cmd, publish_events=False, exhaust_allowed=awaitable) + try: + doc = self.command("admin", cmd, publish_events=False, exhaust_allowed=awaitable) + except AutoReconnect as exc: + if performing_handshake: + raise ConfigurationError( + f"The server may have closed the connection because it does not " + f"support the wire protocol version used in the initial handshake. " + f"Ensure your MongoDB server version is {MIN_SUPPORTED_SERVER_VERSION} or newer." + ) from exc + raise if performing_handshake: self.connect_rtt = time.monotonic() - start hello = Hello(doc, awaitable=awaitable) - # OP_MSG requires wire version 6+. - if hello.max_wire_version < 6: - raise ConfigurationError( - "Server at %s:%d reports wire version %d, but this version of " - "PyMongo requires at least %d (MongoDB %s)." - % ( - self.address[0], - self.address[1] or 0, - hello.max_wire_version, - MIN_SUPPORTED_WIRE_VERSION, - MIN_SUPPORTED_SERVER_VERSION, - ) - ) self.is_writable = hello.is_writable self.max_wire_version = hello.max_wire_version self.max_bson_size = hello.max_bson_size diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index 42803c8a83..bc115f97e9 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -293,23 +293,30 @@ def responder(request): ) def test_handshake_op_msg_not_supported(self): - # If a server responds with maxWireVersion < 6 (no OP_MSG support), - # the wire version error must surface to the user. + # If a server doesn't understand OP_MSG it will just close the connection, thus the driver should + # hint towards OP_MSG potentially not being supported in the error. server = MockupDB() - server.autoresponds("ismaster", ok=1, ismaster=True, minWireVersion=0, maxWireVersion=5) + + def responder(request): + if isinstance(request, OpMsg): + request.hangup() + return None + + server.autoresponds(responder) server.run() self.addCleanup(server.stop) client = MongoClient(server.uri, serverSelectionTimeoutMS=500) self.addCleanup(client.close) - # The ConfigurationError from _hello() is stored as the server's error - # and surfaces inside ServerSelectionTimeoutError. - expected = re.escape( - "reports wire version 5, but this version of PyMongo requires at least " - "%d (MongoDB %s)." % (MIN_SUPPORTED_WIRE_VERSION, MIN_SUPPORTED_SERVER_VERSION) - ) - with self.assertRaisesRegex(ServerSelectionTimeoutError, expected): + with self.assertRaisesRegex( + ServerSelectionTimeoutError, + re.escape( + f"The server may have closed the connection because it does not " + f"support the wire protocol version used in the initial handshake. " + f"Ensure your MongoDB server version is {MIN_SUPPORTED_SERVER_VERSION} or newer." + ), + ): client.db.command("ping") From 148e9136d24a6dd6882ab8402964c49928b7aae3 Mon Sep 17 00:00:00 2001 From: Iris Date: Mon, 1 Jun 2026 16:37:44 -0700 Subject: [PATCH 03/10] oops accidentally removed the await -- made changes to sync and copy pasted them back to async *facepalm* --- pymongo/asynchronous/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 5a56cd7523..55e7b58d16 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -288,7 +288,7 @@ async def _hello( if performing_handshake: start = time.monotonic() try: - doc = self.command("admin", cmd, publish_events=False, exhaust_allowed=awaitable) + doc = await self.command("admin", cmd, publish_events=False, exhaust_allowed=awaitable) except AutoReconnect as exc: if performing_handshake: raise ConfigurationError( From 921f29d28259f2cf687c7b185abda2400e142937 Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 3 Jun 2026 10:52:30 -0700 Subject: [PATCH 04/10] remove opquery opreply and propagate original error to user --- pymongo/asynchronous/command_cursor.py | 20 +- pymongo/asynchronous/cursor.py | 39 ++-- pymongo/asynchronous/network.py | 29 +-- pymongo/asynchronous/pool.py | 16 +- pymongo/message.py | 262 +------------------------ pymongo/network_layer.py | 6 +- pymongo/response.py | 10 +- pymongo/synchronous/command_cursor.py | 20 +- pymongo/synchronous/cursor.py | 39 ++-- pymongo/synchronous/network.py | 29 +-- pymongo/synchronous/pool.py | 16 +- test/asynchronous/test_client.py | 8 +- test/asynchronous/test_pooling.py | 4 +- test/mockupdb/operations.py | 4 +- test/mockupdb/test_handshake.py | 27 ++- test/test_client.py | 8 +- test/test_pooling.py | 4 +- 17 files changed, 113 insertions(+), 428 deletions(-) diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index 5a59c67a15..e6fe83a502 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -24,14 +24,13 @@ NoReturn, Optional, Sequence, - Union, ) from bson import CodecOptions, _convert_raw_document_lists_to_streams from pymongo.asynchronous.cursor_base import _AsyncCursorBase, _ConnectionManager from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure -from pymongo.message import _GetMore, _OpMsg, _OpReply, _RawBatchGetMore +from pymongo.message import _GetMore, _OpMsg, _RawBatchGetMore from pymongo.response import PinnedResponse from pymongo.typings import _Address, _DocumentOut, _DocumentType @@ -145,7 +144,7 @@ async def _maybe_pin_connection(self, conn: AsyncConnection) -> None: def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[Mapping[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, @@ -189,15 +188,10 @@ async def _send_message(self, operation: _GetMore) -> None: if isinstance(response, PinnedResponse): if not self._sock_mgr: self._sock_mgr = _ConnectionManager(response.conn, response.more_to_come) # type: ignore[arg-type] - if response.from_command: - cursor = response.docs[0]["cursor"] - documents = cursor["nextBatch"] - self._postbatchresumetoken = cursor.get("postBatchResumeToken") - self._id = cursor["id"] - else: - documents = response.docs - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + cursor = response.docs[0]["cursor"] + documents = cursor["nextBatch"] + self._postbatchresumetoken = cursor.get("postBatchResumeToken") + self._id = cursor["id"] if self._id == 0: await self.close() @@ -333,7 +327,7 @@ def __init__( def _unpack_response( # type: ignore[override] self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[dict[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index a60c082ade..4e3adc1809 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -48,7 +48,6 @@ from pymongo.message import ( _GetMore, _OpMsg, - _OpReply, _Query, _RawBatchGetMore, _RawBatchQuery, @@ -864,7 +863,7 @@ def collation(self, collation: Optional[_CollationIn]) -> AsyncCursor[_DocumentT def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions, # type: ignore[type-arg] user_fields: Optional[Mapping[str, Any]] = None, @@ -1020,29 +1019,23 @@ async def _send_message(self, operation: Union[_Query, _GetMore]) -> None: cmd_name = operation.name docs = response.docs - if response.from_command: - if cmd_name != "explain": - cursor = docs[0]["cursor"] - self._id = cursor["id"] - if cmd_name == "find": - documents = cursor["firstBatch"] - # Update the namespace used for future getMore commands. - ns = cursor.get("ns") - if ns: - self._dbname, self._collname = ns.split(".", 1) - else: - documents = cursor["nextBatch"] - self._data = deque(documents) - self._retrieved += len(documents) + if cmd_name != "explain": + cursor = docs[0]["cursor"] + self._id = cursor["id"] + if cmd_name == "find": + documents = cursor["firstBatch"] + # Update the namespace used for future getMore commands. + ns = cursor.get("ns") + if ns: + self._dbname, self._collname = ns.split(".", 1) else: - self._id = 0 - self._data = deque(docs) - self._retrieved += len(docs) + documents = cursor["nextBatch"] + self._data = deque(documents) + self._retrieved += len(documents) else: - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + self._id = 0 self._data = deque(docs) - self._retrieved += response.data.number_returned + self._retrieved += len(docs) if self._id == 0: # Don't wait for garbage collection to call __del__, return the @@ -1195,7 +1188,7 @@ def __init__( def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[Mapping[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index 5a5dc7fa2c..3de8c5343a 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -62,7 +62,7 @@ async def command( conn: AsyncConnection, dbname: str, spec: MutableMapping[str, Any], - is_mongos: bool, + is_mongos: bool, # noqa: ARG001 read_preference: Optional[_ServerMode], codec_options: CodecOptions[_DocumentType], session: Optional[AsyncClientSession], @@ -110,14 +110,10 @@ async def command( :param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed. """ name = next(iter(spec)) - ns = dbname + ".$cmd" speculative_hello = False # Publish the original command document, perhaps with lsid and $clusterTime. orig = spec - if is_mongos and not use_op_msg: - assert read_preference is not None - spec = message._maybe_add_read_preference(spec, read_preference) if read_concern and not (session and session.in_transaction): if read_concern.level: spec["readConcern"] = read_concern.document @@ -142,20 +138,15 @@ async def command( conn.apply_timeout(client, spec) _csot.apply_write_concern(spec, write_concern) - if use_op_msg: - flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 - flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 - request_id, msg, size, max_doc_size = message._op_msg( - flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx - ) - # If this is an unacknowledged write then make sure the encoded doc(s) - # are small enough, otherwise rely on the server to return an error. - if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: - message._raise_document_too_large(name, size, max_bson_size) - else: - request_id, msg, size = message._query( - 0, ns, 0, -1, spec, None, codec_options, compression_ctx - ) + flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 + flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 + request_id, msg, size, max_doc_size = message._op_msg( + flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx + ) + # If this is an unacknowledged write then make sure the encoded doc(s) + # are small enough, otherwise rely on the server to return an error. + if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: + message._raise_document_too_large(name, size, max_bson_size) if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 55e7b58d16..4bbc7045b3 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -46,7 +46,6 @@ MAX_MESSAGE_SIZE, MAX_WIRE_VERSION, MAX_WRITE_BATCH_SIZE, - MIN_SUPPORTED_SERVER_VERSION, ORDERED_TYPES, ) from pymongo.errors import ( # type:ignore[attr-defined] @@ -103,7 +102,7 @@ ZlibContext, ZstdContext, ) - from pymongo.message import _OpMsg, _OpReply + from pymongo.message import _OpMsg from pymongo.read_concern import ReadConcern from pymongo.read_preferences import _ServerMode from pymongo.typings import _Address, _CollationIn @@ -287,16 +286,7 @@ async def _hello( if performing_handshake: start = time.monotonic() - try: - doc = await self.command("admin", cmd, publish_events=False, exhaust_allowed=awaitable) - except AutoReconnect as exc: - if performing_handshake: - raise ConfigurationError( - f"The server may have closed the connection because it does not " - f"support the wire protocol version used in the initial handshake. " - f"Ensure your MongoDB server version is {MIN_SUPPORTED_SERVER_VERSION} or newer." - ) from exc - raise + doc = await self.command("admin", cmd, publish_events=False, exhaust_allowed=awaitable) if performing_handshake: self.connect_rtt = time.monotonic() - start hello = Hello(doc, awaitable=awaitable) @@ -456,7 +446,7 @@ async def send_message(self, message: bytes, max_doc_size: int) -> None: except BaseException as error: await self._raise_connection_failure(error) - async def receive_message(self, request_id: Optional[int]) -> Union[_OpReply, _OpMsg]: + async def receive_message(self, request_id: Optional[int]) -> _OpMsg: """Receive a raw BSON message or raise ConnectionFailure. If any exception is raised, the socket is closed. diff --git a/pymongo/message.py b/pymongo/message.py index b0d1ceb105..fdac2b4daa 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -39,14 +39,12 @@ import bson from bson import CodecOptions, _dict_to_bson, _make_c_string -from bson.int64 import Int64 from bson.raw_bson import ( _RAW_ARRAY_BSON_OPTIONS, DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument, _inflate_bson, ) -from pymongo.hello import HelloCompat from pymongo.monitoring import _EventListeners try: @@ -57,12 +55,8 @@ _use_c = False from pymongo.errors import ( ConfigurationError, - CursorNotFound, DocumentTooLarge, - ExecutionTimeout, InvalidOperation, - NotPrimaryError, - OperationFailure, ProtocolError, ) from pymongo.read_preferences import ReadPreference, _ServerMode @@ -423,96 +417,6 @@ def _op_msg( command[identifier] = docs -def _query_impl( - options: int, - collection_name: str, - num_to_skip: int, - num_to_return: int, - query: Mapping[str, Any], - field_selector: Optional[Mapping[str, Any]], - opts: CodecOptions[Any], -) -> tuple[bytes, int]: - """Get an OP_QUERY message.""" - encoded = _dict_to_bson(query, False, opts) - if field_selector: - efs = _dict_to_bson(field_selector, False, opts) - else: - efs = b"" - max_bson_size = max(len(encoded), len(efs)) - return ( - b"".join( - [ - _pack_int(options), - bson._make_c_string(collection_name), - _pack_int(num_to_skip), - _pack_int(num_to_return), - encoded, - efs, - ] - ), - max_bson_size, - ) - - -def _query_compressed( - options: int, - collection_name: str, - num_to_skip: int, - num_to_return: int, - query: Mapping[str, Any], - field_selector: Optional[Mapping[str, Any]], - opts: CodecOptions[Any], - ctx: Union[SnappyContext, ZlibContext, ZstdContext], -) -> tuple[int, bytes, int]: - """Internal compressed query message helper.""" - op_query, max_bson_size = _query_impl( - options, collection_name, num_to_skip, num_to_return, query, field_selector, opts - ) - rid, msg = _compress(2004, op_query, ctx) - return rid, msg, max_bson_size - - -def _query_uncompressed( - options: int, - collection_name: str, - num_to_skip: int, - num_to_return: int, - query: Mapping[str, Any], - field_selector: Optional[Mapping[str, Any]], - opts: CodecOptions[Any], -) -> tuple[int, bytes, int]: - """Internal query message helper.""" - op_query, max_bson_size = _query_impl( - options, collection_name, num_to_skip, num_to_return, query, field_selector, opts - ) - rid, msg = __pack_message(2004, op_query) - return rid, msg, max_bson_size - - -if _use_c: - _query_uncompressed = _cmessage._query_message - - -def _query( - options: int, - collection_name: str, - num_to_skip: int, - num_to_return: int, - query: Mapping[str, Any], - field_selector: Optional[Mapping[str, Any]], - opts: CodecOptions[Any], - ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None, -) -> tuple[int, bytes, int]: - """Get a **query** message.""" - if ctx: - return _query_compressed( - options, collection_name, num_to_skip, num_to_return, query, field_selector, opts, ctx - ) - return _query_uncompressed( - options, collection_name, num_to_skip, num_to_return, query, field_selector, opts - ) - - _pack_long_long = struct.Struct(" list[bytes | memoryview]: - """Check the response header from the database, without decoding BSON. - - Check the response for errors and unpack. - - Can raise CursorNotFound, NotPrimaryError, ExecutionTimeout, or - OperationFailure. - - :param cursor_id: cursor_id we sent to get this response - - used for raising an informative exception when we get cursor id not - valid at server response. - """ - if self.flags & 1: - # Shouldn't get this response if we aren't doing a getMore - if cursor_id is None: - raise ProtocolError("No cursor id for getMore operation") - - # Fake a getMore command response. OP_GET_MORE provides no - # document. - msg = "Cursor not found, cursor id: %d" % (cursor_id,) - errobj = {"ok": 0, "errmsg": msg, "code": 43} - raise CursorNotFound(msg, 43, errobj) - elif self.flags & 2: - error_object: dict[str, Any] = bson.BSON(self.documents).decode() - # Fake the ok field if it doesn't exist. - error_object.setdefault("ok", 0) - if error_object["$err"].startswith(HelloCompat.LEGACY_ERROR): - raise NotPrimaryError(error_object["$err"], error_object) - elif error_object.get("code") == 50: - default_msg = "operation exceeded time limit" - raise ExecutionTimeout( - error_object.get("$err", default_msg), error_object.get("code"), error_object - ) - raise OperationFailure( - "database error: %s" % error_object.get("$err"), - error_object.get("code"), - error_object, - ) - if self.documents: - return [self.documents] - return [] - - def unpack_response( - self, - cursor_id: Optional[int] = None, - codec_options: CodecOptions[Any] = _UNICODE_REPLACE_CODEC_OPTIONS, - user_fields: Optional[Mapping[str, Any]] = None, - legacy_response: bool = False, - ) -> list[dict[str, Any]]: - """Unpack a response from the database and decode the BSON document(s). - - Check the response for errors and unpack, returning a dictionary - containing the response data. - - Can raise CursorNotFound, NotPrimaryError, ExecutionTimeout, or - OperationFailure. - - :param cursor_id: cursor_id we sent to get this response - - used for raising an informative exception when we get cursor id not - valid at server response - :param codec_options: an instance of - :class:`~bson.codec_options.CodecOptions` - :param user_fields: Response fields that should be decoded - using the TypeDecoders from codec_options, passed to - bson._decode_all_selective. - """ - self.raw_response(cursor_id) - if legacy_response: - return bson.decode_all(self.documents, codec_options) - return bson._decode_all_selective(self.documents, codec_options, user_fields) - - def command_response(self, codec_options: CodecOptions[Any]) -> dict[str, Any]: - """Unpack a command response.""" - docs = self.unpack_response(codec_options=codec_options) - assert self.number_returned == 1 - return docs[0] - - def raw_command_response(self) -> NoReturn: - """Return the bytes of the command response.""" - # This should never be called on _OpReply. - raise NotImplementedError - - @property - def more_to_come(self) -> bool: - """Is the moreToCome bit set on this response?""" - return False - - @classmethod - def unpack(cls, msg: bytes | memoryview) -> _OpReply: - """Construct an _OpReply from raw bytes.""" - # PYTHON-945: ignore starting_from field. - flags, cursor_id, _, number_returned = cls.UNPACK_FROM(msg) - - documents = msg[20:] - return cls(flags, cursor_id, number_returned, documents) - - class _OpMsg: """A MongoDB OP_MSG response message.""" @@ -1541,8 +1330,7 @@ def unpack(cls, msg: bytes | memoryview) -> _OpMsg: return cls(flags, payload_document) -_UNPACK_REPLY: dict[int, Callable[[bytes | memoryview], Union[_OpReply, _OpMsg]]] = { - _OpReply.OP_CODE: _OpReply.unpack, +_UNPACK_REPLY: dict[int, Callable[[bytes | memoryview], _OpMsg]] = { _OpMsg.OP_CODE: _OpMsg.unpack, } @@ -1680,54 +1468,20 @@ def as_command( def get_message( self, read_preference: _ServerMode, conn: _AgnosticConnection, use_cmd: bool = False ) -> tuple[int, bytes, int]: - """Get a query message, possibly setting the secondaryOk bit.""" + """Get a query message""" # Use the read_preference decided by _socket_from_server. self.read_preference = read_preference - if read_preference.mode: - # Set the secondaryOk bit. - flags = self.flags | 4 - else: - flags = self.flags - - ns = self.namespace() - spec = self.spec - if use_cmd: - spec = self.as_command(conn)[0] - request_id, msg, size, _ = _op_msg( - 0, - spec, - self.db, - read_preference, - self.codec_options, - ctx=conn.compression_context, - ) - return request_id, msg, size - - # OP_QUERY treats ntoreturn of -1 and 1 the same, return - # one document and close the cursor. We have to use 2 for - # batch size if 1 is specified. - ntoreturn = self.batch_size == 1 and 2 or self.batch_size - if self.limit: - if ntoreturn: - ntoreturn = min(self.limit, ntoreturn) - else: - ntoreturn = self.limit - - if conn.is_mongos: - assert isinstance(spec, MutableMapping) - spec = _maybe_add_read_preference(spec, read_preference) - - return _query( - flags, - ns, - self.ntoskip, - ntoreturn, + spec = self.as_command(conn)[0] + request_id, msg, size, _ = _op_msg( + 0, spec, - None if use_cmd else self.fields, + self.db, + read_preference, self.codec_options, ctx=conn.compression_context, ) + return request_id, msg, size class _GetMore: diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 7c62a251f8..f4e2d2669b 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -35,7 +35,7 @@ from pymongo.common import MAX_MESSAGE_SIZE from pymongo.compression_support import decompress from pymongo.errors import ProtocolError, _OperationCancelled -from pymongo.message import _UNPACK_REPLY, _OpMsg, _OpReply +from pymongo.message import _UNPACK_REPLY, _OpMsg from pymongo.socket_checker import _errno_from_exception try: @@ -696,7 +696,7 @@ async def async_receive_message( conn: AsyncConnection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE, -) -> Union[_OpReply, _OpMsg]: +) -> _OpMsg: """Receive a raw BSON message or raise socket.error.""" timeout: Optional[Union[float, int]] timeout = conn.conn.gettimeout @@ -745,7 +745,7 @@ async def async_receive_message( def receive_message( conn: Connection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE -) -> Union[_OpReply, _OpMsg]: +) -> _OpMsg: """Receive a raw BSON message or raise socket.error.""" if _csot.get_timeout(): deadline = _csot.get_deadline() diff --git a/pymongo/response.py b/pymongo/response.py index 211ddf2354..1bd2411e14 100644 --- a/pymongo/response.py +++ b/pymongo/response.py @@ -15,12 +15,12 @@ """Represent a response from the server.""" from __future__ import annotations -from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Union +from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence if TYPE_CHECKING: from datetime import timedelta - from pymongo.message import _OpMsg, _OpReply + from pymongo.message import _OpMsg from pymongo.typings import _Address, _AgnosticConnection, _DocumentOut @@ -29,7 +29,7 @@ class Response: def __init__( self, - data: Union[_OpMsg, _OpReply], + data: _OpMsg, address: _Address, request_id: int, duration: Optional[timedelta], @@ -52,7 +52,7 @@ def __init__( self._docs = docs @property - def data(self) -> Union[_OpMsg, _OpReply]: + def data(self) -> _OpMsg: """Server response's raw BSON bytes.""" return self._data @@ -87,7 +87,7 @@ class PinnedResponse(Response): def __init__( self, - data: Union[_OpMsg, _OpReply], + data: _OpMsg, address: _Address, conn: _AgnosticConnection, request_id: int, diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index 34f60c6540..ba21bdc33e 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -24,13 +24,12 @@ NoReturn, Optional, Sequence, - Union, ) from bson import CodecOptions, _convert_raw_document_lists_to_streams from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure -from pymongo.message import _GetMore, _OpMsg, _OpReply, _RawBatchGetMore +from pymongo.message import _GetMore, _OpMsg, _RawBatchGetMore from pymongo.response import PinnedResponse from pymongo.synchronous.cursor_base import _ConnectionManager, _CursorBase from pymongo.typings import _Address, _DocumentOut, _DocumentType @@ -145,7 +144,7 @@ def _maybe_pin_connection(self, conn: Connection) -> None: def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[Mapping[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, @@ -189,15 +188,10 @@ def _send_message(self, operation: _GetMore) -> None: if isinstance(response, PinnedResponse): if not self._sock_mgr: self._sock_mgr = _ConnectionManager(response.conn, response.more_to_come) # type: ignore[arg-type] - if response.from_command: - cursor = response.docs[0]["cursor"] - documents = cursor["nextBatch"] - self._postbatchresumetoken = cursor.get("postBatchResumeToken") - self._id = cursor["id"] - else: - documents = response.docs - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + cursor = response.docs[0]["cursor"] + documents = cursor["nextBatch"] + self._postbatchresumetoken = cursor.get("postBatchResumeToken") + self._id = cursor["id"] if self._id == 0: self.close() @@ -333,7 +327,7 @@ def __init__( def _unpack_response( # type: ignore[override] self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[dict[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index 5a721d8e06..8085d58669 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -46,7 +46,6 @@ from pymongo.message import ( _GetMore, _OpMsg, - _OpReply, _Query, _RawBatchGetMore, _RawBatchQuery, @@ -862,7 +861,7 @@ def collation(self, collation: Optional[_CollationIn]) -> Cursor[_DocumentType]: def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions, # type: ignore[type-arg] user_fields: Optional[Mapping[str, Any]] = None, @@ -1018,29 +1017,23 @@ def _send_message(self, operation: Union[_Query, _GetMore]) -> None: cmd_name = operation.name docs = response.docs - if response.from_command: - if cmd_name != "explain": - cursor = docs[0]["cursor"] - self._id = cursor["id"] - if cmd_name == "find": - documents = cursor["firstBatch"] - # Update the namespace used for future getMore commands. - ns = cursor.get("ns") - if ns: - self._dbname, self._collname = ns.split(".", 1) - else: - documents = cursor["nextBatch"] - self._data = deque(documents) - self._retrieved += len(documents) + if cmd_name != "explain": + cursor = docs[0]["cursor"] + self._id = cursor["id"] + if cmd_name == "find": + documents = cursor["firstBatch"] + # Update the namespace used for future getMore commands. + ns = cursor.get("ns") + if ns: + self._dbname, self._collname = ns.split(".", 1) else: - self._id = 0 - self._data = deque(docs) - self._retrieved += len(docs) + documents = cursor["nextBatch"] + self._data = deque(documents) + self._retrieved += len(documents) else: - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + self._id = 0 self._data = deque(docs) - self._retrieved += response.data.number_returned + self._retrieved += len(docs) if self._id == 0: # Don't wait for garbage collection to call __del__, return the @@ -1191,7 +1184,7 @@ def __init__(self, collection: Collection[_DocumentType], *args: Any, **kwargs: def _unpack_response( self, - response: Union[_OpReply, _OpMsg], + response: _OpMsg, cursor_id: Optional[int], codec_options: CodecOptions[Mapping[str, Any]], user_fields: Optional[Mapping[str, Any]] = None, diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 7d9bca4d58..86dc1a47d8 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -62,7 +62,7 @@ def command( conn: Connection, dbname: str, spec: MutableMapping[str, Any], - is_mongos: bool, + is_mongos: bool, # noqa: ARG001 read_preference: Optional[_ServerMode], codec_options: CodecOptions[_DocumentType], session: Optional[ClientSession], @@ -110,14 +110,10 @@ def command( :param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed. """ name = next(iter(spec)) - ns = dbname + ".$cmd" speculative_hello = False # Publish the original command document, perhaps with lsid and $clusterTime. orig = spec - if is_mongos and not use_op_msg: - assert read_preference is not None - spec = message._maybe_add_read_preference(spec, read_preference) if read_concern and not (session and session.in_transaction): if read_concern.level: spec["readConcern"] = read_concern.document @@ -142,20 +138,15 @@ def command( conn.apply_timeout(client, spec) _csot.apply_write_concern(spec, write_concern) - if use_op_msg: - flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 - flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 - request_id, msg, size, max_doc_size = message._op_msg( - flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx - ) - # If this is an unacknowledged write then make sure the encoded doc(s) - # are small enough, otherwise rely on the server to return an error. - if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: - message._raise_document_too_large(name, size, max_bson_size) - else: - request_id, msg, size = message._query( - 0, ns, 0, -1, spec, None, codec_options, compression_ctx - ) + flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 + flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 + request_id, msg, size, max_doc_size = message._op_msg( + flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx + ) + # If this is an unacknowledged write then make sure the encoded doc(s) + # are small enough, otherwise rely on the server to return an error. + if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: + message._raise_document_too_large(name, size, max_bson_size) if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 6bc2385edd..e3875a5b99 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -43,7 +43,6 @@ MAX_MESSAGE_SIZE, MAX_WIRE_VERSION, MAX_WRITE_BATCH_SIZE, - MIN_SUPPORTED_SERVER_VERSION, ORDERED_TYPES, ) from pymongo.errors import ( # type:ignore[attr-defined] @@ -100,7 +99,7 @@ ZlibContext, ZstdContext, ) - from pymongo.message import _OpMsg, _OpReply + from pymongo.message import _OpMsg from pymongo.read_concern import ReadConcern from pymongo.read_preferences import _ServerMode from pymongo.synchronous.auth import _AuthContext @@ -287,16 +286,7 @@ def _hello( if performing_handshake: start = time.monotonic() - try: - doc = self.command("admin", cmd, publish_events=False, exhaust_allowed=awaitable) - except AutoReconnect as exc: - if performing_handshake: - raise ConfigurationError( - f"The server may have closed the connection because it does not " - f"support the wire protocol version used in the initial handshake. " - f"Ensure your MongoDB server version is {MIN_SUPPORTED_SERVER_VERSION} or newer." - ) from exc - raise + doc = self.command("admin", cmd, publish_events=False, exhaust_allowed=awaitable) if performing_handshake: self.connect_rtt = time.monotonic() - start hello = Hello(doc, awaitable=awaitable) @@ -456,7 +446,7 @@ def send_message(self, message: bytes, max_doc_size: int) -> None: except BaseException as error: self._raise_connection_failure(error) - def receive_message(self, request_id: Optional[int]) -> Union[_OpReply, _OpMsg]: + def receive_message(self, request_id: Optional[int]) -> _OpMsg: """Receive a raw BSON message or raise ConnectionFailure. If any exception is raised, the socket is closed. diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index dea1161afa..5902e957a2 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -2305,10 +2305,10 @@ async def receive_message(request_id): # Discard the actual server response. await AsyncConnection.receive_message(conn, request_id) - # responseFlags bit 1 is QueryFailure. - msg = struct.pack(" Date: Wed, 3 Jun 2026 11:48:02 -0700 Subject: [PATCH 05/10] remove dead code / fix typing tests --- pymongo/asynchronous/server.py | 11 +---------- pymongo/synchronous/server.py | 11 +---------- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index f212306174..c93eeb413f 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -265,16 +265,7 @@ async def run_operation( duration = datetime.now() - start # Must publish in find / getMore / explain command response # format. - if use_cmd: - res = docs[0] - elif operation.name == "explain": - res = docs[0] if docs else {} - else: - res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] - if operation.name == "find": - res["cursor"]["firstBatch"] = docs - else: - res["cursor"]["nextBatch"] = docs + res = docs[0] if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _COMMAND_LOGGER, diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index f57420918b..73d782092e 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -265,16 +265,7 @@ def run_operation( duration = datetime.now() - start # Must publish in find / getMore / explain command response # format. - if use_cmd: - res = docs[0] - elif operation.name == "explain": - res = docs[0] if docs else {} - else: - res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] - if operation.name == "find": - res["cursor"]["firstBatch"] = docs - else: - res["cursor"]["nextBatch"] = docs + res = docs[0] if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _COMMAND_LOGGER, From 664ece190bb6be4c38238f6b3297fc20b2544bf4 Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 3 Jun 2026 12:41:24 -0700 Subject: [PATCH 06/10] remove opreply from test --- test/mockupdb/test_network_disconnect_primary.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/mockupdb/test_network_disconnect_primary.py b/test/mockupdb/test_network_disconnect_primary.py index b5ccd5276f..5417b1e926 100644 --- a/test/mockupdb/test_network_disconnect_primary.py +++ b/test/mockupdb/test_network_disconnect_primary.py @@ -18,7 +18,7 @@ import pytest try: - from mockupdb import Future, MockupDB, OpReply, going, wait_until + from mockupdb import Future, MockupDB, OpMsgReply, going, wait_until _HAVE_MOCKUPDB = True except ImportError: @@ -44,7 +44,7 @@ def test_network_disconnect_primary(self): self.addCleanup(server.stop) hosts = [server.address_string for server in (primary, secondary)] - primary_response = OpReply( + primary_response = OpMsgReply( ismaster=True, setName="rs", hosts=hosts, From 9b0cb2ec0d406e20e25fa20af09c692fc2540b57 Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 3 Jun 2026 15:24:01 -0700 Subject: [PATCH 07/10] add unified test? --- test/handshake/unified/remove-op_query.json | 74 +++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 test/handshake/unified/remove-op_query.json diff --git a/test/handshake/unified/remove-op_query.json b/test/handshake/unified/remove-op_query.json new file mode 100644 index 0000000000..c50367a37e --- /dev/null +++ b/test/handshake/unified/remove-op_query.json @@ -0,0 +1,74 @@ +{ + "description": "remove-op_query", + "schemaVersion": "1.3", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + }, + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "uriOptions": { + "appName": "op-msg-handshake-test", + "serverSelectionTimeoutMS": 500 + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + } + ], + "tests": [ + { + "description": "server closing connection during initial hello produces an error", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "isMaster" + ], + "appName": "op-msg-handshake-test", + "closeConnection": true + } + } + } + }, + { + "name": "runCommand", + "object": "database", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectError": { + "errorContains": "connection closed", + "isClientError": false + } + } + ] + } + ] +} From 2a17752b21187162920e0fa8c1a252292de9195a Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 3 Jun 2026 16:09:16 -0700 Subject: [PATCH 08/10] wait do the unified tests need to be pulled out to run? :thinking: --- test/asynchronous/test_handshake_unified.py | 31 +++++++++++++++++++++ test/test_handshake_unified.py | 31 +++++++++++++++++++++ tools/synchro.py | 1 + 3 files changed, 63 insertions(+) create mode 100644 test/asynchronous/test_handshake_unified.py create mode 100644 test/test_handshake_unified.py diff --git a/test/asynchronous/test_handshake_unified.py b/test/asynchronous/test_handshake_unified.py new file mode 100644 index 0000000000..e2c94bd9d1 --- /dev/null +++ b/test/asynchronous/test_handshake_unified.py @@ -0,0 +1,31 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the handshake unified spec tests.""" +from __future__ import annotations + +import sys + +sys.path[0:0] = [""] + +from test import unittest +from test.unified_format import generate_test_classes, get_test_path + +_IS_SYNC = True + +# Generate unified tests. +globals().update(generate_test_classes(get_test_path("handshake", "unified"), module=__name__)) + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_handshake_unified.py b/test/test_handshake_unified.py new file mode 100644 index 0000000000..e2c94bd9d1 --- /dev/null +++ b/test/test_handshake_unified.py @@ -0,0 +1,31 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the handshake unified spec tests.""" +from __future__ import annotations + +import sys + +sys.path[0:0] = [""] + +from test import unittest +from test.unified_format import generate_test_classes, get_test_path + +_IS_SYNC = True + +# Generate unified tests. +globals().update(generate_test_classes(get_test_path("handshake", "unified"), module=__name__)) + +if __name__ == "__main__": + unittest.main() diff --git a/tools/synchro.py b/tools/synchro.py index 39250ab14a..b792ba7856 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -242,6 +242,7 @@ def async_only_test(f: str) -> bool: "test_gridfs.py", "test_gridfs_bucket.py", "test_gridfs_spec.py", + "test_handshake_unified.py", "test_heartbeat_monitoring.py", "test_index_management.py", "test_json_util_integration.py", From f8f5fc8d6202419feb6c1f5564a73dd4bd36acfb Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 3 Jun 2026 17:12:04 -0700 Subject: [PATCH 09/10] fix typing and remove opreply after merge --- pymongo/asynchronous/network.py | 9 ++++----- pymongo/synchronous/network.py | 8 ++++---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index cb89bba995..7d1067371e 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -32,7 +32,7 @@ from pymongo import _csot, helpers_shared, message from pymongo._telemetry import _CommandTelemetry from pymongo.compression_support import _NO_COMPRESSION -from pymongo.message import _OpMsg, _OpReply +from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate if TYPE_CHECKING: @@ -75,7 +75,7 @@ async def _network_command_core( cursor_id: Optional[int] = None, orig: Optional[MutableMapping[str, Any]] = None, speculative_hello: bool = False, -) -> tuple[list[_DocumentOut], Optional[Union[_OpReply, _OpMsg]], datetime.timedelta]: +) -> tuple[list[_DocumentOut], Optional[_OpMsg], datetime.timedelta]: """Send/receive a command and return (docs, raw_reply, duration). Handles APM logging, send/receive, unpacking, response processing, @@ -84,7 +84,7 @@ async def _network_command_core( """ publish = listeners is not None and listeners.enabled_for_commands name = next(iter(spec)) - reply: Optional[Union[_OpReply, _OpMsg]] = None + reply: Optional[_OpMsg] = None docs: list[_DocumentOut] = [] with _CommandTelemetry(client, conn, spec, dbname, request_id, start) as cmd_telemetry: @@ -205,7 +205,7 @@ async def command( parse_write_concern_error: bool = False, collation: Optional[_CollationIn] = None, compression_ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None, - use_op_msg: bool = False, + use_op_msg: bool = False, # noqa: ARG001 unacknowledged: bool = False, user_fields: Optional[Mapping[str, Any]] = None, exhaust_allowed: bool = False, @@ -267,7 +267,6 @@ async def command( conn.apply_timeout(client, spec) _csot.apply_write_concern(spec, write_concern) - flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 request_id, msg, size, max_doc_size = message._op_msg( diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 49ee39314e..233a05cf84 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -32,7 +32,7 @@ from pymongo import _csot, helpers_shared, message from pymongo._telemetry import _CommandTelemetry from pymongo.compression_support import _NO_COMPRESSION -from pymongo.message import _OpMsg, _OpReply +from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate if TYPE_CHECKING: @@ -75,7 +75,7 @@ def _network_command_core( cursor_id: Optional[int] = None, orig: Optional[MutableMapping[str, Any]] = None, speculative_hello: bool = False, -) -> tuple[list[_DocumentOut], Optional[Union[_OpReply, _OpMsg]], datetime.timedelta]: +) -> tuple[list[_DocumentOut], Optional[_OpMsg], datetime.timedelta]: """Send/receive a command and return (docs, raw_reply, duration). Handles APM logging, send/receive, unpacking, response processing, @@ -84,7 +84,7 @@ def _network_command_core( """ publish = listeners is not None and listeners.enabled_for_commands name = next(iter(spec)) - reply: Optional[Union[_OpReply, _OpMsg]] = None + reply: Optional[_OpMsg] = None docs: list[_DocumentOut] = [] with _CommandTelemetry(client, conn, spec, dbname, request_id, start) as cmd_telemetry: @@ -205,7 +205,7 @@ def command( parse_write_concern_error: bool = False, collation: Optional[_CollationIn] = None, compression_ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None, - use_op_msg: bool = False, + use_op_msg: bool = False, # noqa: ARG001 unacknowledged: bool = False, user_fields: Optional[Mapping[str, Any]] = None, exhaust_allowed: bool = False, From 08ba1677b3c5d157feed8917cbaf4a086a8de062 Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 3 Jun 2026 17:25:35 -0700 Subject: [PATCH 10/10] rename test file --- .../unified/{remove-op_query.json => op_msg-not-supported.json} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename test/handshake/unified/{remove-op_query.json => op_msg-not-supported.json} (97%) diff --git a/test/handshake/unified/remove-op_query.json b/test/handshake/unified/op_msg-not-supported.json similarity index 97% rename from test/handshake/unified/remove-op_query.json rename to test/handshake/unified/op_msg-not-supported.json index c50367a37e..b0fb30f245 100644 --- a/test/handshake/unified/remove-op_query.json +++ b/test/handshake/unified/op_msg-not-supported.json @@ -1,5 +1,5 @@ { - "description": "remove-op_query", + "description": "op_msg not supported", "schemaVersion": "1.3", "runOnRequirements": [ {