From 9bf8a1de169a50d31593a5ca1c4ee31c452e18de Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 30 Sep 2025 18:38:26 -0700 Subject: [PATCH 1/8] Reuse ConnectionSource to avoid extra server selection. JAVA-5974 --- .../operation/ChangeStreamBatchCursor.java | 51 ++++++++++++++++++- .../operation/SyncOperationHelper.java | 4 ++ .../ChangeStreamBatchCursorTest.java | 23 ++++++--- 3 files changed, 69 insertions(+), 9 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index c4bd72a4775..f10cd09c68b 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -19,10 +19,14 @@ import com.mongodb.MongoChangeStreamException; import com.mongodb.MongoException; import com.mongodb.MongoOperationTimeoutException; +import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; +import com.mongodb.assertions.Assertions; import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.connection.OperationContext; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; import org.bson.BsonTimestamp; @@ -244,9 +248,9 @@ private void resumeChangeStream() { withReadConnectionSource(binding, source -> { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); + wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(new SourceAwareReadBinding(source, binding))).getWrapped(); return null; }); - wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(binding)).getWrapped(); binding.release(); // release the new change stream batch cursor's reference to the binding } @@ -257,4 +261,49 @@ private boolean hasPreviousNextTimedOut() { private static boolean isTimeoutException(final Throwable exception) { return exception instanceof MongoOperationTimeoutException; } + + private static class SourceAwareReadBinding implements ReadBinding { + private final ConnectionSource source; + private final ReadBinding binding; + + SourceAwareReadBinding(final ConnectionSource source, final ReadBinding binding) { + this.source = source; + this.binding = binding; + } + + @Override + public ReadPreference getReadPreference() { + return binding.getReadPreference(); + } + + @Override + public ConnectionSource getReadConnectionSource() { + return source; + } + + @Override + public ConnectionSource getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference) { + throw Assertions.fail(); + } + + @Override + public int getCount() { + return binding.getCount(); + } + + @Override + public ReadBinding retain() { + return binding.retain(); + } + + @Override + public int release() { + return binding.release(); + } + + @Override + public OperationContext getOperationContext() { + return binding.getOperationContext(); + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java index 6d013df59ba..f8bbacf53ed 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java @@ -97,6 +97,10 @@ interface CommandWriteTransformer { private static final BsonDocumentCodec BSON_DOCUMENT_CODEC = new BsonDocumentCodec(); + /** + * Gets a {@link ConnectionSource} from the {@link ReadBinding#getReadConnectionSource()} and executes + * the provided {@link CallableWithSource} with it. + */ static T withReadConnectionSource(final ReadBinding binding, final CallableWithSource callable) { ConnectionSource source = binding.getReadConnectionSource(); try { diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java index 48c3a50e79a..7835af045dc 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java @@ -153,7 +153,7 @@ void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() { verifyNoMoreInteractions(commandBatchCursor); verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); verify(changeStreamOperation, times(1)).getDecoder(); - verify(changeStreamOperation, times(1)).execute(readBinding); + verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class)); verifyNoMoreInteractions(changeStreamOperation); verify(newCommandBatchCursor, times(1)).next(); verify(newCommandBatchCursor, atLeastOnce()).getPostBatchResumeToken(); @@ -180,7 +180,7 @@ void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() { void shouldPropagateAnyErrorsOccurredInAggregateOperation() { when(commandBatchCursor.next()).thenThrow(new MongoOperationTimeoutException("timeout")); MongoNotPrimaryException resumableError = new MongoNotPrimaryException(new BsonDocument(), new ServerAddress()); - when(changeStreamOperation.execute(readBinding)).thenThrow(resumableError); + when(changeStreamOperation.execute(any(ReadBinding.class))).thenThrow(resumableError); ChangeStreamBatchCursor cursor = createChangeStreamCursor(); //when @@ -208,11 +208,12 @@ void shouldResumeAfterTimeoutInAggregateOnNextCall() { clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); //second next operation times out on resume attempt when creating change stream - when(changeStreamOperation.execute(readBinding)).thenThrow(new MongoOperationTimeoutException("timeout during resumption")); + when(changeStreamOperation.execute(any(ReadBinding.class))).thenThrow( + new MongoOperationTimeoutException("timeout during resumption")); assertThrows(MongoOperationTimeoutException.class, cursor::next); - clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation); + clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); - doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(readBinding); + doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(any(ReadBinding.class)); //when third operation succeeds to resume and call next List next = cursor.next(); @@ -242,7 +243,8 @@ void shouldCloseChangeStreamWhenResumeOperationFailsDueToNonTimeoutError() { clearInvocations(commandBatchCursor, newCommandBatchCursor, timeoutContext, changeStreamOperation, readBinding); //when second next operation errors on resume attempt when creating change stream - when(changeStreamOperation.execute(readBinding)).thenThrow(new MongoNotPrimaryException(new BsonDocument(), new ServerAddress())); + when(changeStreamOperation.execute(any(ReadBinding.class))).thenThrow( + new MongoNotPrimaryException(new BsonDocument(), new ServerAddress())); assertThrows(MongoNotPrimaryException.class, cursor::next); //then @@ -280,7 +282,8 @@ private void verifyNoResumeAttemptCalled() { private void verifyResumeAttemptCalled() { verify(commandBatchCursor, times(1)).close(); verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); - verify(changeStreamOperation, times(1)).execute(readBinding); + verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class)); + verify(readBinding, times(1)).getReadConnectionSource(); verifyNoMoreInteractions(commandBatchCursor); } @@ -326,7 +329,11 @@ void setUp() { changeStreamOperation = mock(ChangeStreamOperation.class); when(changeStreamOperation.getDecoder()).thenReturn(new DocumentCodec()); doNothing().when(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); - when(changeStreamOperation.execute(readBinding)).thenReturn(newChangeStreamCursor); + when(changeStreamOperation.execute(any(ReadBinding.class))).thenAnswer(invocation -> { + ReadBinding binding = invocation.getArgument(0); + binding.getReadConnectionSource(); + return newChangeStreamCursor; + }); } } From b125e5ad4c39d3ca47aaa5b2d35b1aa561e2f020 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 17 Oct 2025 14:45:36 -0700 Subject: [PATCH 2/8] Retain wrapped ConnectionSource getReadConnectionSource. --- .../com/mongodb/internal/operation/ChangeStreamBatchCursor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index f10cd09c68b..f5c483d7d85 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -278,6 +278,7 @@ public ReadPreference getReadPreference() { @Override public ConnectionSource getReadConnectionSource() { + source.retain(); return source; } From 835d556950d528a211807667e8d7e8af2eb110b9 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 17 Oct 2025 14:54:52 -0700 Subject: [PATCH 3/8] Add comments for clarity. --- .../mongodb/internal/operation/ChangeStreamBatchCursor.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index f5c483d7d85..61708473143 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -248,6 +248,8 @@ private void resumeChangeStream() { withReadConnectionSource(binding, source -> { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); + // The same source is pined to resulting CommandBatchCursor, so we need to wrap the binding + // to return the same source to avoid double-selection of the server. wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(new SourceAwareReadBinding(source, binding))).getWrapped(); return null; }); @@ -262,6 +264,9 @@ private static boolean isTimeoutException(final Throwable exception) { return exception instanceof MongoOperationTimeoutException; } + /** + * Does not retain wrapped {link @ReadBinding} as it serves as a wrapper only. + */ private static class SourceAwareReadBinding implements ReadBinding { private final ConnectionSource source; private final ReadBinding binding; From ccd52af56820eccd9f8c20bba49debf1b4b07303 Mon Sep 17 00:00:00 2001 From: Viacheslav Babanin Date: Fri, 17 Oct 2025 15:44:39 -0700 Subject: [PATCH 4/8] Update comment. --- .../com/mongodb/internal/operation/ChangeStreamBatchCursor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index 61708473143..b51e6c02696 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -248,7 +248,7 @@ private void resumeChangeStream() { withReadConnectionSource(binding, source -> { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); - // The same source is pined to resulting CommandBatchCursor, so we need to wrap the binding + // The same source is pinned to resulting CommandBatchCursor, so we need to wrap the binding // to return the same source to avoid double-selection of the server. wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(new SourceAwareReadBinding(source, binding))).getWrapped(); return null; From 830b00995ef87b76f229d9814d34af10496d54b4 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 14 Nov 2025 18:12:37 -0800 Subject: [PATCH 5/8] Add integration tests. --- .../AsyncChangeStreamBatchCursor.java | 55 ++++++++- .../ChangeStreamBatchCursorTest.java | 11 +- .../client/ChangeStreamFunctionalTest.java | 29 +++++ .../AbstractChangeSteamFunctionalTest.java | 105 ++++++++++++++++++ .../client/ChangeSteamFunctionalTest.java | 27 +++++ 5 files changed, 221 insertions(+), 6 deletions(-) create mode 100644 driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ChangeStreamFunctionalTest.java create mode 100644 driver-sync/src/test/functional/com/mongodb/client/AbstractChangeSteamFunctionalTest.java create mode 100644 driver-sync/src/test/functional/com/mongodb/client/ChangeSteamFunctionalTest.java diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java index a144888f859..4ea2c8d1cee 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java @@ -18,9 +18,12 @@ import com.mongodb.MongoException; +import com.mongodb.ReadPreference; +import com.mongodb.assertions.Assertions; import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor; import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncConnectionSource; import com.mongodb.internal.binding.AsyncReadBinding; import com.mongodb.internal.connection.OperationContext; import com.mongodb.lang.NonNull; @@ -232,8 +235,9 @@ private void retryOperation(final AsyncBlock asyncBlock, } else { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, assertNotNull(source).getServerDescription().getMaxWireVersion()); - source.release(); - changeStreamOperation.executeAsync(binding, operationContext, (asyncBatchCursor, t1) -> { + // The same source is pined to resulting AsyncCommandBatchCursor, so we need to wrap the binding + // to return the same source to avoid double-selection of the server. + changeStreamOperation.executeAsync(new AsyncSourceAwareReadBinding(source, binding), operationContext, (asyncBatchCursor, t1) -> { if (t1 != null) { callback.onResult(null, t1); } else { @@ -242,6 +246,7 @@ private void retryOperation(final AsyncBlock asyncBlock, operationContext); } finally { try { + source.release(); binding.release(); // release the new change stream batch cursor's reference to the binding } finally { resumeableOperation(asyncBlock, callback, operationContext, tryNext); @@ -252,5 +257,51 @@ private void retryOperation(final AsyncBlock asyncBlock, } }); } + + /** + * Does not retain wrapped {@link AsyncReadBinding} as it serves as a wrapper only. + */ + private static class AsyncSourceAwareReadBinding implements AsyncReadBinding { + private final AsyncConnectionSource source; + private final AsyncReadBinding binding; + + AsyncSourceAwareReadBinding(final AsyncConnectionSource source, final AsyncReadBinding binding) { + this.source = source; + this.binding = binding; + } + + @Override + public ReadPreference getReadPreference() { + return binding.getReadPreference(); + } + + @Override + public void getReadConnectionSource(final OperationContext operationContext, final SingleResultCallback callback) { + source.retain(); + callback.onResult(source, null); + } + + @Override + public void getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference, + final OperationContext operationContext, + final SingleResultCallback callback) { + throw Assertions.fail(); + } + + @Override + public AsyncReadBinding retain() { + return binding.retain(); + } + + @Override + public int release() { + return binding.release(); + } + + @Override + public int getCount() { + return binding.getCount(); + } + } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java index 06f92a9ef2c..69ffa051640 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java @@ -25,6 +25,7 @@ import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.binding.ReadWriteBinding; import com.mongodb.internal.connection.Connection; import com.mongodb.internal.connection.NoOpSessionContext; import com.mongodb.internal.connection.OperationContext; @@ -275,7 +276,7 @@ void shouldResumeAfterTimeoutInAggregateOnNextCall() { when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenThrow( new MongoOperationTimeoutException("timeout during resumption")); assertThrows(MongoOperationTimeoutException.class, cursor::next); - clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation); + clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation, readBinding); doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(any(ReadBinding.class), any()); @@ -344,11 +345,12 @@ private void verifyNoResumeAttemptCalled() { private void verifyResumeAttemptCalled() { verify(cursor, times(1)).close(any()); verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); - verify(changeStreamOperation, times(1)).execute(eq(readBinding), any()); + verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class), any()); verifyNoMoreInteractions(cursor); verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class), any()); + // Verify server selection is done once for the resume attempt. verify(readBinding, times(1)).getReadConnectionSource(any()); - verifyNoMoreInteractions(commandBatchCursor); + verifyNoMoreInteractions(cursor); } @BeforeEach @@ -399,7 +401,8 @@ void setUp() { doNothing().when(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenAnswer(invocation -> { ReadBinding binding = invocation.getArgument(0); - binding.getReadConnectionSource(); + OperationContext operationContext = invocation.getArgument(1); + binding.getReadConnectionSource(operationContext); return newChangeStreamCursor; }); } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ChangeStreamFunctionalTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ChangeStreamFunctionalTest.java new file mode 100644 index 00000000000..3b204150019 --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ChangeStreamFunctionalTest.java @@ -0,0 +1,29 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.reactivestreams.client; + +import com.mongodb.MongoClientSettings; +import com.mongodb.client.AbstractChangeSteamFunctionalTest; +import com.mongodb.client.MongoClient; +import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; + +public class ChangeStreamFunctionalTest extends AbstractChangeSteamFunctionalTest { + @Override + protected MongoClient createMongoClient(final MongoClientSettings mongoClientSettings) { + return new SyncMongoClient(mongoClientSettings); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractChangeSteamFunctionalTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractChangeSteamFunctionalTest.java new file mode 100644 index 00000000000..20f4314497c --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractChangeSteamFunctionalTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.ClusterFixture; +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoNamespace; +import com.mongodb.client.model.changestream.ChangeStreamDocument; +import com.mongodb.client.test.CollectionHelper; +import org.bson.BsonDocument; +import org.bson.BsonTimestamp; +import org.bson.Document; +import org.bson.codecs.BsonDocumentCodec; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.mongodb.client.Fixture.getDefaultDatabaseName; +import static java.lang.String.format; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** + * The {@link ChangeStreamProseTest}, which is defined only for sync driver, should be migrated to this class. + * Once this done, this class should be renamed to ChangeStreamProseTest. + */ +public abstract class AbstractChangeSteamFunctionalTest { + + private static final String FAIL_COMMAND_NAME = "failCommand"; + private static final MongoNamespace NAMESPACE = new MongoNamespace(getDefaultDatabaseName(), "test"); + private final CollectionHelper collectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), NAMESPACE); + + protected abstract MongoClient createMongoClient(MongoClientSettings mongoClientSettings); + + @Test + public void shouldDoOneServerSelectionForResumeAttempt() { + //given + assumeTrue(ClusterFixture.isDiscoverableReplicaSet()); + AtomicInteger serverSelectionCounter = new AtomicInteger(); + BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); + try (MongoClient mongoClient = createMongoClient(Fixture.getMongoClientSettingsBuilder() + .applyToClusterSettings(builder -> builder.serverSelector(clusterDescription -> { + serverSelectionCounter.incrementAndGet(); + return clusterDescription.getServerDescriptions(); + })).build())) { + + MongoCollection collection = mongoClient + .getDatabase(NAMESPACE.getDatabaseName()) + .getCollection(NAMESPACE.getCollectionName()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\"," + + " mode: {" + + " times: 1" + + " }," + + " data: {" + + " failCommands: ['getMore']," + + " errorCode: 10107," + + " errorLabels: ['ResumableChangeStreamError']" + + " }" + + "}"); + // We insert document here, because async cursor performs aggregate and getMore right after we call cursor() + collection.insertOne(Document.parse("{ x: 1 }")); + serverSelectionCounter.set(0); + + try (MongoChangeStreamCursor> cursor = collection.watch() + .batchSize(0) + .startAtOperationTime(startTime) + .cursor()) { + + //when + ChangeStreamDocument changeStreamDocument = cursor.next(); + //then + assertNotNull(changeStreamDocument); + int actualCountOfServerSelections = serverSelectionCounter.get(); + assertEquals(2, actualCountOfServerSelections, + format("Expected only one additional server selection due to resume after resumable error, but was %s", + actualCountOfServerSelections)); + } + } + } + + @AfterEach + public void tearDown() throws InterruptedException { + ClusterFixture.disableFailPoint(FAIL_COMMAND_NAME); + collectionHelper.drop(); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/ChangeSteamFunctionalTest.java b/driver-sync/src/test/functional/com/mongodb/client/ChangeSteamFunctionalTest.java new file mode 100644 index 00000000000..c4ec6286ce2 --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/client/ChangeSteamFunctionalTest.java @@ -0,0 +1,27 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.MongoClientSettings; + +public class ChangeSteamFunctionalTest extends AbstractChangeSteamFunctionalTest { + + @Override + protected MongoClient createMongoClient(final MongoClientSettings mongoClientSettings) { + return MongoClients.create(mongoClientSettings); + } +} From 8befb71ff62929de779bb8b53c876c43e52b308b Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 14 Nov 2025 18:22:24 -0800 Subject: [PATCH 6/8] Remove unnecessary blank line in SyncOperationHelper.java --- .../main/com/mongodb/internal/operation/SyncOperationHelper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java index 694c12e8438..15406d0b7cb 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java @@ -97,7 +97,6 @@ interface CommandWriteTransformer { private static final BsonDocumentCodec BSON_DOCUMENT_CODEC = new BsonDocumentCodec(); - static T withReadConnectionSource(final ReadBinding binding, final OperationContext operationContext, final CallableWithSource callable) { From 15152d4fcf39875364ec1f946424e5339506e711 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 14 Nov 2025 18:28:00 -0800 Subject: [PATCH 7/8] Change comments. --- .../internal/operation/AsyncChangeStreamBatchCursor.java | 4 ++-- .../mongodb/internal/operation/ChangeStreamBatchCursor.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java index 4ea2c8d1cee..ce7127e0dc3 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java @@ -235,8 +235,8 @@ private void retryOperation(final AsyncBlock asyncBlock, } else { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, assertNotNull(source).getServerDescription().getMaxWireVersion()); - // The same source is pined to resulting AsyncCommandBatchCursor, so we need to wrap the binding - // to return the same source to avoid double-selection of the server. + // We wrap the binding so that the selected AsyncConnectionSource is reused, preventing redundant server selection. + // Consequently, the same AsyncConnectionSource remains pinned to the resulting AsyncCommandCursor. changeStreamOperation.executeAsync(new AsyncSourceAwareReadBinding(source, binding), operationContext, (asyncBatchCursor, t1) -> { if (t1 != null) { callback.onResult(null, t1); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index 7491cea0ce4..cf9f1dcf6c4 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -254,8 +254,8 @@ private void resumeChangeStream(final OperationContext operationContext) { wrapped.close(operationContextWithDefaultMaxTime); withReadConnectionSource(binding, operationContext, (source, operationContextWithMinRtt) -> { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); - // The same source is pinned to resulting CommandBatchCursor, so we need to wrap the binding - // to return the same source to avoid double-selection of the server. + // We wrap the binding so that the selected ConnectionSource is reused, preventing redundant server selection. + // Consequently, the same ConnectionSource remains pinned to the resulting CommandCursor. wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(new SourceAwareReadBinding(source, binding), operationContextWithDefaultMaxTime)).getWrapped(); return null; }); From 467ae631e2969ae97657514be4da36b84f1906d8 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 14 Nov 2025 18:32:03 -0800 Subject: [PATCH 8/8] Change error message. --- .../com/mongodb/client/AbstractChangeSteamFunctionalTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractChangeSteamFunctionalTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractChangeSteamFunctionalTest.java index 20f4314497c..26f24ed0e90 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractChangeSteamFunctionalTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractChangeSteamFunctionalTest.java @@ -91,7 +91,7 @@ public void shouldDoOneServerSelectionForResumeAttempt() { assertNotNull(changeStreamDocument); int actualCountOfServerSelections = serverSelectionCounter.get(); assertEquals(2, actualCountOfServerSelections, - format("Expected only one additional server selection due to resume after resumable error, but was %s", + format("Expected 2 server selections (initial aggregate command + resume attempt aggregate command), but there were %s", actualCountOfServerSelections)); } }