Skip to content

Commit 3d9252c

Browse files
committed
Use ns.coll filters instead of ns filters for single-database change streams.
1 parent 57f7660 commit 3d9252c

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -411,8 +411,10 @@ export class ChangeStream {
411411
private getSourceNamespaceFilters(): { $match: any; multipleDatabases: boolean } {
412412
const sourceTables = this.sync_rules.getSourceTables();
413413

414-
let $inFilters: any[] = [{ db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION }];
415-
let $refilters: any[] = [];
414+
let $inFilters: { db: string; coll: string }[] = [
415+
{ db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION }
416+
];
417+
let $refilters: { 'ns.db': string; 'ns.coll': RegExp }[] = [];
416418
let multipleDatabases = false;
417419
for (let tablePattern of sourceTables) {
418420
if (tablePattern.connectionTag != this.connections.connectionTag) {
@@ -435,10 +437,15 @@ export class ChangeStream {
435437
});
436438
}
437439
}
440+
const nsFilter = multipleDatabases
441+
? // cluster-level: filter on the entire namespace
442+
{ ns: { $in: $inFilters } }
443+
: // collection-level: filter on coll only
444+
{ 'ns.coll': { $in: $inFilters.map((ns) => ns.coll) } };
438445
if ($refilters.length > 0) {
439-
return { $match: { $or: [{ ns: { $in: $inFilters } }, ...$refilters] }, multipleDatabases };
446+
return { $match: { $or: [nsFilter, ...$refilters] }, multipleDatabases };
440447
}
441-
return { $match: { ns: { $in: $inFilters } }, multipleDatabases };
448+
return { $match: nsFilter, multipleDatabases };
442449
}
443450

444451
static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteInputRow> {

0 commit comments

Comments
 (0)