diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java index a144888f85..ce7127e0dc 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java @@ -18,9 +18,12 @@ import com.mongodb.MongoException; +import com.mongodb.ReadPreference; +import com.mongodb.assertions.Assertions; import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor; import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncConnectionSource; import com.mongodb.internal.binding.AsyncReadBinding; import com.mongodb.internal.connection.OperationContext; import com.mongodb.lang.NonNull; @@ -232,8 +235,9 @@ private void retryOperation(final AsyncBlock asyncBlock, } else { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, assertNotNull(source).getServerDescription().getMaxWireVersion()); - source.release(); - changeStreamOperation.executeAsync(binding, operationContext, (asyncBatchCursor, t1) -> { + // We wrap the binding so that the selected AsyncConnectionSource is reused, preventing redundant server selection. + // Consequently, the same AsyncConnectionSource remains pinned to the resulting AsyncCommandCursor. + changeStreamOperation.executeAsync(new AsyncSourceAwareReadBinding(source, binding), operationContext, (asyncBatchCursor, t1) -> { if (t1 != null) { callback.onResult(null, t1); } else { @@ -242,6 +246,7 @@ private void retryOperation(final AsyncBlock asyncBlock, operationContext); } finally { try { + source.release(); binding.release(); // release the new change stream batch cursor's reference to the binding } finally { resumeableOperation(asyncBlock, callback, operationContext, tryNext); @@ -252,5 +257,51 @@ private void retryOperation(final AsyncBlock asyncBlock, } }); } + + /** + * Does not retain wrapped {@link AsyncReadBinding} as it serves as a wrapper only. + */ + private static class AsyncSourceAwareReadBinding implements AsyncReadBinding { + private final AsyncConnectionSource source; + private final AsyncReadBinding binding; + + AsyncSourceAwareReadBinding(final AsyncConnectionSource source, final AsyncReadBinding binding) { + this.source = source; + this.binding = binding; + } + + @Override + public ReadPreference getReadPreference() { + return binding.getReadPreference(); + } + + @Override + public void getReadConnectionSource(final OperationContext operationContext, final SingleResultCallback callback) { + source.retain(); + callback.onResult(source, null); + } + + @Override + public void getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference, + final OperationContext operationContext, + final SingleResultCallback callback) { + throw Assertions.fail(); + } + + @Override + public AsyncReadBinding retain() { + return binding.retain(); + } + + @Override + public int release() { + return binding.release(); + } + + @Override + public int getCount() { + return binding.getCount(); + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index a750637a10..cf9f1dcf6c 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -19,9 +19,12 @@ import com.mongodb.MongoChangeStreamException; import com.mongodb.MongoException; import com.mongodb.MongoOperationTimeoutException; +import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; +import com.mongodb.assertions.Assertions; import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.binding.ReadBinding; import com.mongodb.internal.connection.OperationContext; import com.mongodb.lang.Nullable; @@ -251,9 +254,11 @@ private void resumeChangeStream(final OperationContext operationContext) { wrapped.close(operationContextWithDefaultMaxTime); withReadConnectionSource(binding, operationContext, (source, operationContextWithMinRtt) -> { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); + // We wrap the binding so that the selected ConnectionSource is reused, preventing redundant server selection. + // Consequently, the same ConnectionSource remains pinned to the resulting CommandCursor. + wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(new SourceAwareReadBinding(source, binding), operationContextWithDefaultMaxTime)).getWrapped(); return null; }); - wrapped = ((ChangeStreamBatchCursor) changeStreamOperation.execute(binding, operationContextWithDefaultMaxTime)).getWrapped(); binding.release(); // release the new change stream batch cursor's reference to the binding } @@ -264,4 +269,48 @@ private boolean hasPreviousNextTimedOut() { private static boolean isTimeoutException(final Throwable exception) { return exception instanceof MongoOperationTimeoutException; } + + /** + * Does not retain wrapped {link @ReadBinding} as it serves as a wrapper only. + */ + private static class SourceAwareReadBinding implements ReadBinding { + private final ConnectionSource source; + private final ReadBinding binding; + + SourceAwareReadBinding(final ConnectionSource source, final ReadBinding binding) { + this.source = source; + this.binding = binding; + } + + @Override + public ReadPreference getReadPreference() { + return binding.getReadPreference(); + } + + @Override + public ConnectionSource getReadConnectionSource(final OperationContext ignored) { + source.retain(); + return source; + } + + @Override + public ConnectionSource getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference, final OperationContext ignored) { + throw Assertions.fail(); + } + + @Override + public int getCount() { + return binding.getCount(); + } + + @Override + public ReadBinding retain() { + return binding.retain(); + } + + @Override + public int release() { + return binding.release(); + } + } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java index 3ce014986e..69ffa05164 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java @@ -25,6 +25,7 @@ import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.binding.ReadWriteBinding; import com.mongodb.internal.connection.Connection; import com.mongodb.internal.connection.NoOpSessionContext; import com.mongodb.internal.connection.OperationContext; @@ -216,7 +217,7 @@ void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() { verify(newCursor).next(operationContextCaptor.capture())); verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); verify(changeStreamOperation, times(1)).getDecoder(); - verify(changeStreamOperation, times(1)).execute(eq(readBinding), any()); + verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class), any()); verifyNoMoreInteractions(changeStreamOperation); verify(newCursor, times(1)).next(any()); verify(newCursor, atLeastOnce()).getPostBatchResumeToken(); @@ -245,7 +246,7 @@ void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() { void shouldPropagateAnyErrorsOccurredInAggregateOperation() { when(cursor.next(any())).thenThrow(new MongoOperationTimeoutException("timeout")); MongoNotPrimaryException resumableError = new MongoNotPrimaryException(new BsonDocument(), new ServerAddress()); - when(changeStreamOperation.execute(eq(readBinding), any())).thenThrow(resumableError); + when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenThrow(resumableError); ChangeStreamBatchCursor cursor = createChangeStreamCursor(); //when @@ -272,12 +273,12 @@ void shouldResumeAfterTimeoutInAggregateOnNextCall() { clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation, readBinding); //second next operation times out on resume attempt when creating change stream - when(changeStreamOperation.execute(eq(readBinding), any())).thenThrow( + when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenThrow( new MongoOperationTimeoutException("timeout during resumption")); assertThrows(MongoOperationTimeoutException.class, cursor::next); - clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation); + clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation, readBinding); - doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(eq(readBinding), any()); + doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(any(ReadBinding.class), any()); //when third operation succeeds to resume and call next sleep(TIMEOUT_CONSUMPTION_SLEEP_MS); @@ -308,7 +309,7 @@ void shouldCloseChangeStreamWhenResumeOperationFailsDueToNonTimeoutError() { clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation, readBinding); //when second next operation errors on resume attempt when creating change stream - when(changeStreamOperation.execute(eq(readBinding), any())).thenThrow( + when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenThrow( new MongoNotPrimaryException(new BsonDocument(), new ServerAddress())); assertThrows(MongoNotPrimaryException.class, cursor::next); @@ -344,7 +345,11 @@ private void verifyNoResumeAttemptCalled() { private void verifyResumeAttemptCalled() { verify(cursor, times(1)).close(any()); verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); - verify(changeStreamOperation, times(1)).execute(eq(readBinding), any()); + verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class), any()); + verifyNoMoreInteractions(cursor); + verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class), any()); + // Verify server selection is done once for the resume attempt. + verify(readBinding, times(1)).getReadConnectionSource(any()); verifyNoMoreInteractions(cursor); } @@ -394,10 +399,14 @@ void setUp() { changeStreamOperation = mock(ChangeStreamOperation.class); when(changeStreamOperation.getDecoder()).thenReturn(new DocumentCodec()); doNothing().when(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion); - when(changeStreamOperation.execute(eq(readBinding), any())).thenReturn(newChangeStreamCursor); + when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenAnswer(invocation -> { + ReadBinding binding = invocation.getArgument(0); + OperationContext operationContext = invocation.getArgument(1); + binding.getReadConnectionSource(operationContext); + return newChangeStreamCursor; + }); } - private void assertTimeoutWasRefreshedForOperation(final TimeoutContext timeoutContextUsedForOperation) { assertNotNull(timeoutContextUsedForOperation.getTimeout(), "TimeoutMs was not set"); timeoutContextUsedForOperation.getTimeout().run(TimeUnit.MILLISECONDS, () -> { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ChangeStreamFunctionalTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ChangeStreamFunctionalTest.java new file mode 100644 index 0000000000..3b20415001 --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ChangeStreamFunctionalTest.java @@ -0,0 +1,29 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.reactivestreams.client; + +import com.mongodb.MongoClientSettings; +import com.mongodb.client.AbstractChangeSteamFunctionalTest; +import com.mongodb.client.MongoClient; +import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; + +public class ChangeStreamFunctionalTest extends AbstractChangeSteamFunctionalTest { + @Override + protected MongoClient createMongoClient(final MongoClientSettings mongoClientSettings) { + return new SyncMongoClient(mongoClientSettings); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractChangeSteamFunctionalTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractChangeSteamFunctionalTest.java new file mode 100644 index 0000000000..26f24ed0e9 --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractChangeSteamFunctionalTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.ClusterFixture; +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoNamespace; +import com.mongodb.client.model.changestream.ChangeStreamDocument; +import com.mongodb.client.test.CollectionHelper; +import org.bson.BsonDocument; +import org.bson.BsonTimestamp; +import org.bson.Document; +import org.bson.codecs.BsonDocumentCodec; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.mongodb.client.Fixture.getDefaultDatabaseName; +import static java.lang.String.format; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** + * The {@link ChangeStreamProseTest}, which is defined only for sync driver, should be migrated to this class. + * Once this done, this class should be renamed to ChangeStreamProseTest. + */ +public abstract class AbstractChangeSteamFunctionalTest { + + private static final String FAIL_COMMAND_NAME = "failCommand"; + private static final MongoNamespace NAMESPACE = new MongoNamespace(getDefaultDatabaseName(), "test"); + private final CollectionHelper collectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), NAMESPACE); + + protected abstract MongoClient createMongoClient(MongoClientSettings mongoClientSettings); + + @Test + public void shouldDoOneServerSelectionForResumeAttempt() { + //given + assumeTrue(ClusterFixture.isDiscoverableReplicaSet()); + AtomicInteger serverSelectionCounter = new AtomicInteger(); + BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); + try (MongoClient mongoClient = createMongoClient(Fixture.getMongoClientSettingsBuilder() + .applyToClusterSettings(builder -> builder.serverSelector(clusterDescription -> { + serverSelectionCounter.incrementAndGet(); + return clusterDescription.getServerDescriptions(); + })).build())) { + + MongoCollection collection = mongoClient + .getDatabase(NAMESPACE.getDatabaseName()) + .getCollection(NAMESPACE.getCollectionName()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\"," + + " mode: {" + + " times: 1" + + " }," + + " data: {" + + " failCommands: ['getMore']," + + " errorCode: 10107," + + " errorLabels: ['ResumableChangeStreamError']" + + " }" + + "}"); + // We insert document here, because async cursor performs aggregate and getMore right after we call cursor() + collection.insertOne(Document.parse("{ x: 1 }")); + serverSelectionCounter.set(0); + + try (MongoChangeStreamCursor> cursor = collection.watch() + .batchSize(0) + .startAtOperationTime(startTime) + .cursor()) { + + //when + ChangeStreamDocument changeStreamDocument = cursor.next(); + //then + assertNotNull(changeStreamDocument); + int actualCountOfServerSelections = serverSelectionCounter.get(); + assertEquals(2, actualCountOfServerSelections, + format("Expected 2 server selections (initial aggregate command + resume attempt aggregate command), but there were %s", + actualCountOfServerSelections)); + } + } + } + + @AfterEach + public void tearDown() throws InterruptedException { + ClusterFixture.disableFailPoint(FAIL_COMMAND_NAME); + collectionHelper.drop(); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/ChangeSteamFunctionalTest.java b/driver-sync/src/test/functional/com/mongodb/client/ChangeSteamFunctionalTest.java new file mode 100644 index 0000000000..c4ec6286ce --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/client/ChangeSteamFunctionalTest.java @@ -0,0 +1,27 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.MongoClientSettings; + +public class ChangeSteamFunctionalTest extends AbstractChangeSteamFunctionalTest { + + @Override + protected MongoClient createMongoClient(final MongoClientSettings mongoClientSettings) { + return MongoClients.create(mongoClientSettings); + } +}