Skip to content

Conversation

@rkistner
Copy link
Contributor

@rkistner rkistner commented Nov 27, 2025

This optimizes MongoDB change stream filters to use {'ns.coll': {$in: ['a', 'b', 'c']}, rather than {'ns': {'$in': [{db: 'db', coll: 'a'}, {db: 'db', coll: 'b'}, {db: 'db', coll: 'c'}]}, in cases where only a single database is being replicated.

This appears to improve oplog scanning performance when a lot of individual collections (say 30+) are being replicated - observed 10x or more difference in scanning time in my testing. This should help to significantly reduce the occurrence of [PSYNC_S1345] Timeout while reading MongoDB ChangeStream errors.

This also sets maxTimeMS on the change stream query, so that we get a query timeout rather than a socket timeout. Before: [PSYNC_S1345] Timeout while reading MongoDB ChangeStream cause: connection 2 to 1.2.3.4:27017 timed out. After: [PSYNC_S1345] Timeout while reading MongoDB ChangeStream cause: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit

Background

We're seeing more and more users run into this error:

PSYNC_S1345: Failed to read MongoDB Change Stream due to a timeout.

Example Discord thread

Most commonly this happens when:

  1. There are multiple databases in the same source cluster.
  2. There are bulk updates running in a database (or collection) other than the one being replicated by PowerSync.

For example, when a staging and production db use the same cluster, and the larger production database receives some bulk updates, the staging powersync instance may get that error. Luckily, the issue is less common in production setups.

One case we have seen in production is when the same cluster is used both for the source database and the bucket storage database. In that case, doing initial replication causes bulk updates in the cluster, which can trigger this issue.

The main issue is that the instance cannot recover from the issue automatically.

Why does this happen?

Similar to logical decoding on Postgres, MongoDB change streams effectively do a sequential scan through the oplog. If there are many changes in the oplog unrelated to the Change Stream, MongoDB may need to scan through a lot before receiving the first change. In that time, it does not return any response, and eventually the query or connection times out. Since there is no intermediate progress (resumeToken) returned, the next attempt restarts at the same point, and receives the same error.

Now, the performance is affected a lot by the change stream pipeline. MongoDB internally converts the $changeStream pipeline into an oplog aggregation pipeline. It uses many internal stages, for example first running some pre-filters, then doing the fullDocument lookup, then additional filters.

What we've seen during testing, is that the current approach of filtering collections using {'ns': {'$in': [{db: 'db', coll: 'a'}, {db: 'db', coll: 'b'}, {db: 'db', coll: 'c'}]} is expanded into a massive pipeline for the oplog aggregation, which can be very slow.

Fixing the issue

There is not much we can do to capture progress or recover the issue if we do consistently run into timeouts, so our main options are to (1) increase the timeout, or (2) improve the performance of the pipeline. To improve the performance, one option is to simply remove the collection filters, and stream changes for all collections in the database. In my testing that mostly resolves this issue, but has the potential to also stream a lot of changes to collections that we don't use.

Luckily, changing the collection filters to the form {'ns.coll': {$in: ['a', 'b', 'c']} appears to give a massive gain in performance here.

Multiple databases

This PR still uses the old ns filters when replicating multiple databases on the same cluster. We can investigate and address this in a future PR.

Performance Testing

These tests were all run on MongoDB 7.0. 8.0 has slightly different internal pipelines, but similar performance.

For my testing, I used an M10 Atlas cluster, initialized with sample data (specifically sample_airbnb for this, although the only thing that really matters is the volume of data). Then:

On the test db, capture a resumeToken:

db.runCommand({
  aggregate: 1,
  pipeline: [{$changeStream: {showExpandedEvents: true}}],
  cursor: { batchSize: 10 }
});

Then, on sample_airbnb, generate lots of updates:

for (let i = 0; i < 100; i++) {db.listingsAndReviews.updateMany({}, {$set: {t: Math.random(), i:i}}); print(i)}

Then we can test performance of various change stream pipelines on the test database.

Case 1

Current style filtering on 50 test collections (which don't actually exist) - takes around 17s in my tests.

start = Date.now();
print(
  db.runCommand({
    aggregate: 1,
    pipeline: [
      {
        $changeStream: {
          fullDocument: "required",
          showExpandedEvents: true,
          resumeAfter: { _data: "826928363C000000022B0429296E1404" },
        },
      },
      {
        $match: {
          ns: {
            $in: [
              1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
              20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
              36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
            ].map((i) => ({ db: "test", coll: `test${i}` })),
          },
        },
      },
      { $changeStreamSplitLargeEvent: {} },
    ],
    cursor: { batchSize: 2 }
  })
);
print(Date.now() - start);

// Explain:
db.runCommand({
  explain: {
    aggregate: 1,
    pipeline: [
      {
        $changeStream: {
          fullDocument: "required",
          showExpandedEvents: true,
          resumeAfter: { _data: "826928363C000000022B0429296E1404" },
        },
      },
      {
        $match: {
          ns: {
            $in: [
              1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
              20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
              36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
            ].map((i) => ({ db: "test", coll: `test${i}` })),
          },
        },
      },
      { $changeStreamSplitLargeEvent: {} },
    ],
    cursor: { batchSize: 2 }
  },
  verbosity: "executionStats"
});

Explain output: https://gist.github.com/rkistner/6140f2b80831375266ab4e94bbf6b955

Case 2

We can remove the collection filters, reducing the time to around 1.7s in my tests.

start = Date.now();
print(
  db.runCommand({
    aggregate: 1,
    pipeline: [
      {
        $changeStream: {
          fullDocument: "required",
          showExpandedEvents: true,
          resumeAfter: { _data: "826928363C000000022B0429296E1404" },
        },
      },
      { $changeStreamSplitLargeEvent: {} },
    ],
    cursor: { batchSize: 2 },
    explain: false,
  })
);
print(Date.now() - start);

// Explain:
db.runCommand({
  explain: {
    aggregate: 1,
    pipeline: [
      {
        $changeStream: {
          fullDocument: "required",
          showExpandedEvents: true,
          resumeAfter: { _data: "826928363C000000022B0429296E1404" },
        },
      },
      { $changeStreamSplitLargeEvent: {} },
    ],
    cursor: { batchSize: 2 }
  },
   verbosity:"executionStats",
})

Explain output: https://gist.github.com/rkistner/f07709ae52c4eee70495987c732a991f

Case 3

Here we use the ns.coll-style collection filters - also takes around 1.7-2.1s in my tests.

start = Date.now();
print(
  db.runCommand({
    aggregate: 1,
    pipeline: [
      {
        $changeStream: {
          fullDocument: "required",
          showExpandedEvents: true,
          resumeAfter: { _data: "826928363C000000022B0429296E1404" },
        },
      },
      {
        $match: {
          'ns.db': 'test',
          'ns.coll': {
            $in: [
              1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
              20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
              36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
            ].map((i) => `test${i}`),
          },
        },
      },
      { $changeStreamSplitLargeEvent: {} },
    ],
    cursor: { batchSize: 2 }
  })
);
print(Date.now() - start);

// Explain:
db.runCommand({
  explain: {
    aggregate: 1,
    pipeline: [
      {
        $changeStream: {
          fullDocument: "required",
          showExpandedEvents: true,
          resumeAfter: { _data: "826928363C000000022B0429296E1404" },
        },
      },
      {
        $match: {
          'ns.db': 'test',
          'ns.coll': {
            $in: [
              1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
              20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
              36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
            ].map((i) => `test${i}`),
          },
        },
      },
      { $changeStreamSplitLargeEvent: {} },
    ],
    cursor: { batchSize: 2 }
  },
  verbosity: "executionStats"
});

Explain output: https://gist.github.com/rkistner/c14df923f1bd1b419db6f8e800b6aa05

Analysis

Some notes from the explain output and other testing:

  1. In each case. the ns filters are pushed up to the first stage oplog pipeline, but the exact operations are very different between the two. The filters from case 1 are much more verbose.
  2. Even with showExpandedEvents: false there are a lot of filters on e.g. o.dropIndexes. This causes quite a large pipeline overall.

Overall, the pipeline appears to be roughly:

  1. Scan the oplog, filtering by namespace and op type (pulled up from the $match filter from the original pipeline)
  2. internalUnwindTransaction
  3. internalTransform - might do the fullDocument: 'required' lookup (although that is also referenced below), as well as showExpandedEvents.
  4. $_internalChangeStreamCheckInvalidate
  5. $_internalChangeStreamCheckResumability
  6. $match on high-level operationType and ns
  7. $_internalChangeStreamAddPostImage
  8. $changeStreamSplitLargeEvent
  9. $_internalChangeStreamEnsureResumeTokenPresent

Looking at executionTimeMillisEstimate, practically all the time is spend in the first stage of scanning the oplog.

Looking at the filters, we can see that case 1 has an $or stage with 351 conditions, and MongoDB has to evaluate each of these conditions on each document. While in case 3, the first condition effectively filters out just on the database name, which is a much smaller check.

We can run these filters directly against the oplog collection itself:

let ee = db.runCommand({ explain: { ... } } );
let filter = ee.stages[0].$cursor.queryPlanner.parsedQuery;
use local
db.oplog.rs.find(filter).count()

Working with actual data

The above analysis focused on performance when there is no actual data in the change stream after the filters, which is the case causing the timeout. But what about replication performance where there is a lot of data that we do need to replicate?

For this, we run the same tests against the sample_airbnb database with the same data. We use the same setup as above, which gives us around 438k change documents. We run the same 3 test cases, but with the sample_airbnb database, and
include the listingsAndReviews collection.

Case 1B (ns filters): 129s.
Case 2B (no filters): 80s.
Case 3B (ns.coll filters): 83s.

Conclusion: While the impact is less drastic, the pipeline still has a significant effect here, and ns.coll filters still appear to be a good option.

Code:

// Case 1B
db.runCommand({
  explain: {
    aggregate: 1,
    pipeline: [
      {
        $changeStream: {
          fullDocument: "required",
          showExpandedEvents: true,
          resumeAfter: { _data: "826928363C000000022B0429296E1404" },
        },
      },
      {
        $match: {
          ns: {
            $in: [
              1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
              20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
              36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
            ].map((i) => ({ db: "sample_airbnb", coll: `test${i}` })).concat([{db: 'sample_airbnb', coll: 'listingsAndReviews'}]),
          },
        },
      },
      { $changeStreamSplitLargeEvent: {} },
    ],
    cursor: { batchSize: 2 }
  }
});

// Case 2B:
db.runCommand({
  explain: {
    aggregate: 1,
    pipeline: [
      {
        $changeStream: {
          fullDocument: "required",
          showExpandedEvents: true,
          resumeAfter: { _data: "826928363C000000022B0429296E1404" },
        },
      },
      { $changeStreamSplitLargeEvent: {} },
    ],
    cursor: { batchSize: 2 }
  }
});

// Case 3B:
db.runCommand({
  explain: {
    aggregate: 1,
    pipeline: [
      {
        $changeStream: {
          fullDocument: "required",
          showExpandedEvents: true,
          resumeAfter: { _data: "826928363C000000022B0429296E1404" },
        },
      },
      {
        $match: {
          'ns.db': 'sample_airbnb',
          'ns.coll': {
            $in: [
              1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
              20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
              36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50
            ].map((i) => `test${i}`).concat(['listingsAndReviews']),
          },
        },
      },
      { $changeStreamSplitLargeEvent: {} },
    ],
    cursor: { batchSize: 2 }
  }
});

MongoDB 8.0

The tests above were all run on MongoDB 7.0. After upgrading the cluster to MongoDB 8.0 and 8.2.2, all 3 tests appear to run slightly faster, but the relative performance between them is still about the same.

@changeset-bot
Copy link

changeset-bot bot commented Nov 27, 2025

🦋 Changeset detected

Latest commit: f43675e

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 11 packages
Name Type
@powersync/service-module-mongodb Patch
@powersync/service-core Patch
@powersync/service-image Patch
@powersync/service-schema Patch
@powersync/service-core-tests Patch
@powersync/service-module-core Patch
@powersync/service-module-mongodb-storage Patch
@powersync/service-module-mysql Patch
@powersync/service-module-postgres-storage Patch
@powersync/service-module-postgres Patch
test-client Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@rkistner rkistner changed the title [MongoDB] Optimize change stream filters to avoid PSYNC_S1345 [MongoDB] Optimize change stream filters to avoid PSYNC_S1345 timeouts Nov 27, 2025
@rkistner rkistner marked this pull request as ready for review November 27, 2025 14:38
stevensJourney
stevensJourney previously approved these changes Nov 27, 2025
Copy link
Collaborator

@stevensJourney stevensJourney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent description of the issue and solution. Happy with the changes :D

@rkistner
Copy link
Contributor Author

Logged issue upstream: https://jira.mongodb.org/browse/SERVER-114532

@rkistner rkistner merged commit c050acd into main Nov 28, 2025
22 checks passed
@rkistner rkistner deleted the mongodb-pipeline-filters branch November 28, 2025 15:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants