@@ -110,6 +110,8 @@ export class ChangeStream {
110110
111111 private snapshotChunkLength : number ;
112112
113+ private changeStreamTimeout : number ;
114+
113115 constructor ( options : ChangeStreamOptions ) {
114116 this . storage = options . storage ;
115117 this . metrics = options . metrics ;
@@ -122,6 +124,9 @@ export class ChangeStream {
122124 this . sync_rules = options . storage . getParsedSyncRules ( {
123125 defaultSchema : this . defaultDb . databaseName
124126 } ) ;
127+ // The change stream aggregation command should timeout before the socket times out,
128+ // so we use 90% of the socket timeout value.
129+ this . changeStreamTimeout = Math . ceil ( this . client . options . socketTimeoutMS * 0.9 ) ;
125130
126131 this . abort_signal = options . abort_signal ;
127132 this . abort_signal . addEventListener (
@@ -411,8 +416,10 @@ export class ChangeStream {
411416 private getSourceNamespaceFilters ( ) : { $match : any ; multipleDatabases : boolean } {
412417 const sourceTables = this . sync_rules . getSourceTables ( ) ;
413418
414- let $inFilters : any [ ] = [ { db : this . defaultDb . databaseName , coll : CHECKPOINTS_COLLECTION } ] ;
415- let $refilters : any [ ] = [ ] ;
419+ let $inFilters : { db : string ; coll : string } [ ] = [
420+ { db : this . defaultDb . databaseName , coll : CHECKPOINTS_COLLECTION }
421+ ] ;
422+ let $refilters : { 'ns.db' : string ; 'ns.coll' : RegExp } [ ] = [ ] ;
416423 let multipleDatabases = false ;
417424 for ( let tablePattern of sourceTables ) {
418425 if ( tablePattern . connectionTag != this . connections . connectionTag ) {
@@ -435,10 +442,26 @@ export class ChangeStream {
435442 } ) ;
436443 }
437444 }
445+
446+ // When we have a large number of collections, the performance of the pipeline
447+ // depends a lot on how the filters here are specified.
448+ // Currently, only the multipleDatabases == false case is optimized, and the
449+ // wildcard matching version is not tested (but we assume that will be more
450+ // limited in the number of them).
451+ // Specifically, the `ns: {$in: [...]}` version can lead to PSYNC_S1345 timeouts in
452+ // some cases when we have a large number of collections.
453+ // For details, see:
454+ // https://github.com/powersync-ja/powersync-service/pull/417
455+ // https://jira.mongodb.org/browse/SERVER-114532
456+ const nsFilter = multipleDatabases
457+ ? // cluster-level: filter on the entire namespace
458+ { ns : { $in : $inFilters } }
459+ : // collection-level: filter on coll only
460+ { 'ns.coll' : { $in : $inFilters . map ( ( ns ) => ns . coll ) } } ;
438461 if ( $refilters . length > 0 ) {
439- return { $match : { $or : [ { ns : { $in : $inFilters } } , ...$refilters ] } , multipleDatabases } ;
462+ return { $match : { $or : [ nsFilter , ...$refilters ] } , multipleDatabases } ;
440463 }
441- return { $match : { ns : { $in : $inFilters } } , multipleDatabases } ;
464+ return { $match : nsFilter , multipleDatabases } ;
442465 }
443466
444467 static * getQueryData ( results : Iterable < DatabaseInputRow > ) : Generator < SqliteInputRow > {
@@ -747,11 +770,11 @@ export class ChangeStream {
747770 } else {
748771 fullDocument = 'updateLookup' ;
749772 }
750-
751773 const streamOptions : mongo . ChangeStreamOptions = {
752774 showExpandedEvents : true ,
753775 maxAwaitTimeMS : options . maxAwaitTimeMs ?? this . maxAwaitTimeMS ,
754- fullDocument : fullDocument
776+ fullDocument : fullDocument ,
777+ maxTimeMS : this . changeStreamTimeout
755778 } ;
756779
757780 /**
@@ -1103,6 +1126,10 @@ function mapChangeStreamError(e: any) {
11031126 // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
11041127 // We wrap the error to make it more useful.
11051128 throw new DatabaseConnectionError ( ErrorCode . PSYNC_S1345 , `Timeout while reading MongoDB ChangeStream` , e ) ;
1129+ } else if ( isMongoServerError ( e ) && e . codeName == 'MaxTimeMSExpired' ) {
1130+ // maxTimeMS was reached. Example message:
1131+ // MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit
1132+ throw new DatabaseConnectionError ( ErrorCode . PSYNC_S1345 , `Timeout while reading MongoDB ChangeStream` , e ) ;
11061133 } else if (
11071134 isMongoServerError ( e ) &&
11081135 e . codeName == 'NoMatchingDocument' &&
0 commit comments