diff --git a/.changeset/little-pants-call.md b/.changeset/little-pants-call.md new file mode 100644 index 00000000..aa3cafee --- /dev/null +++ b/.changeset/little-pants-call.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +[MongoDB] Optimize change stream filters to avoid PSYNC_S1345 timeouts diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 62d3cda1..0fa47f8a 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -110,6 +110,8 @@ export class ChangeStream { private snapshotChunkLength: number; + private changeStreamTimeout: number; + constructor(options: ChangeStreamOptions) { this.storage = options.storage; this.metrics = options.metrics; @@ -122,6 +124,9 @@ export class ChangeStream { this.sync_rules = options.storage.getParsedSyncRules({ defaultSchema: this.defaultDb.databaseName }); + // The change stream aggregation command should timeout before the socket times out, + // so we use 90% of the socket timeout value. + this.changeStreamTimeout = Math.ceil(this.client.options.socketTimeoutMS * 0.9); this.abort_signal = options.abort_signal; this.abort_signal.addEventListener( @@ -411,8 +416,10 @@ export class ChangeStream { private getSourceNamespaceFilters(): { $match: any; multipleDatabases: boolean } { const sourceTables = this.sync_rules.getSourceTables(); - let $inFilters: any[] = [{ db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION }]; - let $refilters: any[] = []; + let $inFilters: { db: string; coll: string }[] = [ + { db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION } + ]; + let $refilters: { 'ns.db': string; 'ns.coll': RegExp }[] = []; let multipleDatabases = false; for (let tablePattern of sourceTables) { if (tablePattern.connectionTag != this.connections.connectionTag) { @@ -435,10 +442,26 @@ export class ChangeStream { }); } } + + // When we have a large number of collections, the performance of the pipeline + // depends a lot on how the filters here are specified. + // Currently, only the multipleDatabases == false case is optimized, and the + // wildcard matching version is not tested (but we assume that will be more + // limited in the number of them). + // Specifically, the `ns: {$in: [...]}` version can lead to PSYNC_S1345 timeouts in + // some cases when we have a large number of collections. + // For details, see: + // https://github.com/powersync-ja/powersync-service/pull/417 + // https://jira.mongodb.org/browse/SERVER-114532 + const nsFilter = multipleDatabases + ? // cluster-level: filter on the entire namespace + { ns: { $in: $inFilters } } + : // collection-level: filter on coll only + { 'ns.coll': { $in: $inFilters.map((ns) => ns.coll) } }; if ($refilters.length > 0) { - return { $match: { $or: [{ ns: { $in: $inFilters } }, ...$refilters] }, multipleDatabases }; + return { $match: { $or: [nsFilter, ...$refilters] }, multipleDatabases }; } - return { $match: { ns: { $in: $inFilters } }, multipleDatabases }; + return { $match: nsFilter, multipleDatabases }; } static *getQueryData(results: Iterable): Generator { @@ -747,11 +770,11 @@ export class ChangeStream { } else { fullDocument = 'updateLookup'; } - const streamOptions: mongo.ChangeStreamOptions = { showExpandedEvents: true, maxAwaitTimeMS: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS, - fullDocument: fullDocument + fullDocument: fullDocument, + maxTimeMS: this.changeStreamTimeout }; /** @@ -1103,6 +1126,10 @@ function mapChangeStreamError(e: any) { // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". // We wrap the error to make it more useful. throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); + } else if (isMongoServerError(e) && e.codeName == 'MaxTimeMSExpired') { + // maxTimeMS was reached. Example message: + // MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); } else if ( isMongoServerError(e) && e.codeName == 'NoMatchingDocument' &&