Skip to content

Commit 12b5cee

Browse files
committed
Set maxTimeMS on change streams.
1 parent 3d9252c commit 12b5cee

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ export class ChangeStream {
110110

111111
private snapshotChunkLength: number;
112112

113+
private changeStreamTimeout: number;
114+
113115
constructor(options: ChangeStreamOptions) {
114116
this.storage = options.storage;
115117
this.metrics = options.metrics;
@@ -122,6 +124,9 @@ export class ChangeStream {
122124
this.sync_rules = options.storage.getParsedSyncRules({
123125
defaultSchema: this.defaultDb.databaseName
124126
});
127+
// The change stream aggregation command should timeout before the socket times out,
128+
// so we use 90% of the socket timeout value.
129+
this.changeStreamTimeout = Math.ceil(this.client.options.socketTimeoutMS * 0.9);
125130

126131
this.abort_signal = options.abort_signal;
127132
this.abort_signal.addEventListener(
@@ -754,11 +759,11 @@ export class ChangeStream {
754759
} else {
755760
fullDocument = 'updateLookup';
756761
}
757-
758762
const streamOptions: mongo.ChangeStreamOptions = {
759763
showExpandedEvents: true,
760764
maxAwaitTimeMS: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS,
761-
fullDocument: fullDocument
765+
fullDocument: fullDocument,
766+
maxTimeMS: this.changeStreamTimeout
762767
};
763768

764769
/**
@@ -1110,6 +1115,10 @@ function mapChangeStreamError(e: any) {
11101115
// This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
11111116
// We wrap the error to make it more useful.
11121117
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
1118+
} else if (isMongoServerError(e) && e.codeName == 'MaxTimeMSExpired') {
1119+
// maxTimeMS was reached. Example message:
1120+
// MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit
1121+
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
11131122
} else if (
11141123
isMongoServerError(e) &&
11151124
e.codeName == 'NoMatchingDocument' &&

0 commit comments

Comments
 (0)