Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/little-pants-call.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

[MongoDB] Optimize change stream filters to avoid PSYNC_S1345 timeouts
39 changes: 33 additions & 6 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ export class ChangeStream {

private snapshotChunkLength: number;

private changeStreamTimeout: number;

constructor(options: ChangeStreamOptions) {
this.storage = options.storage;
this.metrics = options.metrics;
Expand All @@ -122,6 +124,9 @@ export class ChangeStream {
this.sync_rules = options.storage.getParsedSyncRules({
defaultSchema: this.defaultDb.databaseName
});
// The change stream aggregation command should timeout before the socket times out,
// so we use 90% of the socket timeout value.
this.changeStreamTimeout = Math.ceil(this.client.options.socketTimeoutMS * 0.9);

this.abort_signal = options.abort_signal;
this.abort_signal.addEventListener(
Expand Down Expand Up @@ -411,8 +416,10 @@ export class ChangeStream {
private getSourceNamespaceFilters(): { $match: any; multipleDatabases: boolean } {
const sourceTables = this.sync_rules.getSourceTables();

let $inFilters: any[] = [{ db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION }];
let $refilters: any[] = [];
let $inFilters: { db: string; coll: string }[] = [
{ db: this.defaultDb.databaseName, coll: CHECKPOINTS_COLLECTION }
];
let $refilters: { 'ns.db': string; 'ns.coll': RegExp }[] = [];
let multipleDatabases = false;
for (let tablePattern of sourceTables) {
if (tablePattern.connectionTag != this.connections.connectionTag) {
Expand All @@ -435,10 +442,26 @@ export class ChangeStream {
});
}
}

// When we have a large number of collections, the performance of the pipeline
// depends a lot on how the filters here are specified.
// Currently, only the multipleDatabases == false case is optimized, and the
// wildcard matching version is not tested (but we assume that will be more
// limited in the number of them).
// Specifically, the `ns: {$in: [...]}` version can lead to PSYNC_S1345 timeouts in
// some cases when we have a large number of collections.
// For details, see:
// https://github.com/powersync-ja/powersync-service/pull/417
// https://jira.mongodb.org/browse/SERVER-114532
const nsFilter = multipleDatabases
? // cluster-level: filter on the entire namespace
{ ns: { $in: $inFilters } }
: // collection-level: filter on coll only
{ 'ns.coll': { $in: $inFilters.map((ns) => ns.coll) } };
if ($refilters.length > 0) {
return { $match: { $or: [{ ns: { $in: $inFilters } }, ...$refilters] }, multipleDatabases };
return { $match: { $or: [nsFilter, ...$refilters] }, multipleDatabases };
}
return { $match: { ns: { $in: $inFilters } }, multipleDatabases };
return { $match: nsFilter, multipleDatabases };
}

static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteInputRow> {
Expand Down Expand Up @@ -747,11 +770,11 @@ export class ChangeStream {
} else {
fullDocument = 'updateLookup';
}

const streamOptions: mongo.ChangeStreamOptions = {
showExpandedEvents: true,
maxAwaitTimeMS: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS,
fullDocument: fullDocument
fullDocument: fullDocument,
maxTimeMS: this.changeStreamTimeout
};

/**
Expand Down Expand Up @@ -1103,6 +1126,10 @@ function mapChangeStreamError(e: any) {
// This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
// We wrap the error to make it more useful.
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
} else if (isMongoServerError(e) && e.codeName == 'MaxTimeMSExpired') {
// maxTimeMS was reached. Example message:
// MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
} else if (
isMongoServerError(e) &&
e.codeName == 'NoMatchingDocument' &&
Expand Down