From 3d9252c02a99e19ae353ed5b8329d3bfd485dd89 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 27 Nov 2025 14:05:32 +0200 Subject: [PATCH 1/4] Use ns.coll filters instead of ns filters for single-database change streams. --- .../src/replication/ChangeStream.ts | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 62d3cda1..abcca640 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -411,8 +411,10 @@ export class ChangeStream { private getSourceNamespaceFilters(): { $match: any; multipleDatabases: boolean } { const sourceTables = this.sync_rules.getSourceTables(); - let $inFilters: any[] = [{ db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION }]; - let $refilters: any[] = []; + let $inFilters: { db: string; coll: string }[] = [ + { db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION } + ]; + let $refilters: { 'ns.db': string; 'ns.coll': RegExp }[] = []; let multipleDatabases = false; for (let tablePattern of sourceTables) { if (tablePattern.connectionTag != this.connections.connectionTag) { @@ -435,10 +437,15 @@ export class ChangeStream { }); } } + const nsFilter = multipleDatabases + ? // cluster-level: filter on the entire namespace + { ns: { $in: $inFilters } } + : // collection-level: filter on coll only + { 'ns.coll': { $in: $inFilters.map((ns) => ns.coll) } }; if ($refilters.length > 0) { - return { $match: { $or: [{ ns: { $in: $inFilters } }, ...$refilters] }, multipleDatabases }; + return { $match: { $or: [nsFilter, ...$refilters] }, multipleDatabases }; } - return { $match: { ns: { $in: $inFilters } }, multipleDatabases }; + return { $match: nsFilter, multipleDatabases }; } static *getQueryData(results: Iterable): Generator { From 12b5cee7e94217379605c5c67d2ffda161875cd7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 27 Nov 2025 14:26:14 +0200 Subject: [PATCH 2/4] Set maxTimeMS on change streams. --- .../module-mongodb/src/replication/ChangeStream.ts | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index abcca640..1452de52 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -110,6 +110,8 @@ export class ChangeStream { private snapshotChunkLength: number; + private changeStreamTimeout: number; + constructor(options: ChangeStreamOptions) { this.storage = options.storage; this.metrics = options.metrics; @@ -122,6 +124,9 @@ export class ChangeStream { this.sync_rules = options.storage.getParsedSyncRules({ defaultSchema: this.defaultDb.databaseName }); + // The change stream aggregation command should timeout before the socket times out, + // so we use 90% of the socket timeout value. + this.changeStreamTimeout = Math.ceil(this.client.options.socketTimeoutMS * 0.9); this.abort_signal = options.abort_signal; this.abort_signal.addEventListener( @@ -754,11 +759,11 @@ export class ChangeStream { } else { fullDocument = 'updateLookup'; } - const streamOptions: mongo.ChangeStreamOptions = { showExpandedEvents: true, maxAwaitTimeMS: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS, - fullDocument: fullDocument + fullDocument: fullDocument, + maxTimeMS: this.changeStreamTimeout }; /** @@ -1110,6 +1115,10 @@ function mapChangeStreamError(e: any) { // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". // We wrap the error to make it more useful. throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); + } else if (isMongoServerError(e) && e.codeName == 'MaxTimeMSExpired') { + // maxTimeMS was reached. Example message: + // MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); } else if ( isMongoServerError(e) && e.codeName == 'NoMatchingDocument' && From 40c08c3ec5b50100e4ec49be8492c60a1464e3e5 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 27 Nov 2025 16:32:14 +0200 Subject: [PATCH 3/4] Add changeset. --- .changeset/little-pants-call.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/little-pants-call.md diff --git a/.changeset/little-pants-call.md b/.changeset/little-pants-call.md new file mode 100644 index 00000000..aa3cafee --- /dev/null +++ b/.changeset/little-pants-call.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +[MongoDB] Optimize change stream filters to avoid PSYNC_S1345 timeouts From 5c42854ff9f1b64f12d5ca3eb50369b7f36f7775 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 28 Nov 2025 10:48:41 +0200 Subject: [PATCH 4/4] Add comments on the performance issue. --- .../module-mongodb/src/replication/ChangeStream.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 1452de52..0fa47f8a 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -442,6 +442,17 @@ export class ChangeStream { }); } } + + // When we have a large number of collections, the performance of the pipeline + // depends a lot on how the filters here are specified. + // Currently, only the multipleDatabases == false case is optimized, and the + // wildcard matching version is not tested (but we assume that will be more + // limited in the number of them). + // Specifically, the `ns: {$in: [...]}` version can lead to PSYNC_S1345 timeouts in + // some cases when we have a large number of collections. + // For details, see: + // https://github.com/powersync-ja/powersync-service/pull/417 + // https://jira.mongodb.org/browse/SERVER-114532 const nsFilter = multipleDatabases ? // cluster-level: filter on the entire namespace { ns: { $in: $inFilters } }