From 828eb6dad4a292e5438dc8295e8d005b178b5acf Mon Sep 17 00:00:00 2001 From: fox0430 Date: Tue, 16 Jun 2026 16:47:15 +0900 Subject: [PATCH] Track confirmed flush LSN so auto keepalive replies preserve at-least-once --- async_postgres/pg_connection/types.nim | 62 +++++- async_postgres/pg_replication.nim | 273 +++++++++++++++++++----- examples/replication.nim | 25 ++- tests/mock_pg_server.nim | 78 +++++++ tests/test_e2e.nim | 9 +- tests/test_physical_replication.nim | 78 +++++-- tests/test_replication_keepalive.nim | 283 +++++++++++++++++++------ 7 files changed, 656 insertions(+), 152 deletions(-) diff --git a/async_postgres/pg_connection/types.nim b/async_postgres/pg_connection/types.nim index 0e7fdf2..e8bd0af 100644 --- a/async_postgres/pg_connection/types.nim +++ b/async_postgres/pg_connection/types.nim @@ -270,6 +270,20 @@ type ## borrower. Raw `acquire` / `release(conn)` callers carry that risk; ## `PooledConnHandle` (its own `released` flag) and the `with*` templates ## are the fully safe paths. + replConfirmedFlushLsnRaw: uint64 + ## Internal replication state: raw LSN value up to which the application + ## has confirmed received WAL is durably flushed during a replication + ## stream. Public API users go through `confirmFlushed` / + ## `confirmedFlushLsn` in `pg_replication`, which add the `Lsn` typing, + ## the monotonic guard, and the received-WAL bound check. Manipulated + ## via the exported helpers in this module (see below). + replMaxReceivedLsnRaw: uint64 + ## Internal replication state: highest WAL position actually received + ## from the wire during the current stream (`XLogData` end LSN, not the + ## server's `walEnd`). Initialised to the stream's `startLsn` and updated + ## by `startReplication` / `startPhysicalReplication`. Used by + ## `confirmFlushed` to reject confirmations beyond received WAL. + ## Manipulated via the exported helpers in this module (see below). QueryResult* = object ## Result of a query: field descriptions, row data, and command tag. @@ -651,7 +665,53 @@ func effectiveMaxMessageSize*(conn: PgConnection): int {.inline.} = else: DefaultMaxBackendMessageLen -# Error builder +# Internal replication LSN plumbing (cross-module use within the library) +# +# `pg_replication` owns the typed `Lsn` API (`confirmFlushed`, +# `confirmedFlushLsn`); `types` stores raw `uint64` values to avoid a dependency +# cycle. The procedures below are exported so `pg_replication` can use them +# without `{.all.}`, but they are intentionally low-level: the public way to +# advance the confirmed-flush position remains `confirmFlushed` in +# `pg_replication`, which adds the `Lsn` typing and the received-WAL bound check. + +proc initReplLsnTracking*(conn: PgConnection, startLsn: uint64) = + ## Reset the per-stream confirmed-flush and max-received positions to + ## `startLsn`. Called at the beginning of each replication stream so a reused + ## connection never inherits stale values from a previous stream. + conn.replConfirmedFlushLsnRaw = startLsn + conn.replMaxReceivedLsnRaw = startLsn + +proc updateReplMaxReceivedLsn*(conn: PgConnection, received: uint64): bool = + ## Advance the highest-received WAL position if `received` is greater than + ## the current value. Returns `true` if the value was updated. + if received > conn.replMaxReceivedLsnRaw: + conn.replMaxReceivedLsnRaw = received + true + else: + false + +func replConfirmedFlushLsn*(conn: PgConnection): uint64 = + ## Raw confirmed-flush LSN. Public API users should use `confirmedFlushLsn` + ## in `pg_replication`. + conn.replConfirmedFlushLsnRaw + +func replMaxReceivedLsn*(conn: PgConnection): uint64 = + ## Raw max-received LSN. Public API users should use the `confirmFlushed` + ## bounds via the typed API in `pg_replication`. + conn.replMaxReceivedLsnRaw + +proc confirmReplFlushed*(conn: PgConnection, lsn: uint64): bool = + ## Clamp `lsn` to the max-received WAL position and advance the + ## confirmed-flush LSN only monotonically. Returns `true` if the position + ## moved forward. This is the raw, untyped helper used by `confirmFlushed` + ## in `pg_replication`; public callers should use that typed API. + let bounded = + if lsn > conn.replMaxReceivedLsnRaw: conn.replMaxReceivedLsnRaw else: lsn + if bounded > conn.replConfirmedFlushLsnRaw: + conn.replConfirmedFlushLsnRaw = bounded + true + else: + false proc newPgQueryError*(fields: seq[ErrorField]): ref PgQueryError = ## Create a PgQueryError from server ErrorResponse fields. diff --git a/async_postgres/pg_replication.nim b/async_postgres/pg_replication.nim index 47db2e9..3d5beb8 100644 --- a/async_postgres/pg_replication.nim +++ b/async_postgres/pg_replication.nim @@ -670,7 +670,7 @@ proc timelineHistory*( # Replication streaming -proc parseReplicationMessage*(copyData: seq[byte]): ReplicationMessage = +proc parseReplicationMessage*(copyData: openArray[byte]): ReplicationMessage = ## Parse a CopyData payload into a ReplicationMessage. if copyData.len == 0: raise newException(PgProtocolError, "Empty replication CopyData") @@ -718,6 +718,24 @@ proc sendCopyData*(conn: PgConnection, data: openArray[byte]): Future[void] = encodeCopyData(buf, data) conn.sendMsg(buf) +proc sendStandbyStatusRaw( + conn: PgConnection, receiveLsn, flushLsn, applyLsn: Lsn, replyRequested: bool +): Future[void] {.async.} = + ## Encode and send a Standby Status Update with the given receive/flush/apply + ## LSNs verbatim — no ``InvalidLsn`` defaulting. This is the single place the + ## wire encoding lives; the public ``sendStandbyStatus`` (which applies the + ## up-to-receive defaulting) and ``sendConfirmedStatus`` (which sends the + ## confirmed position verbatim) both route through it. Callers are responsible + ## for the ``csReplicating`` guard. + let msg = encodeStandbyStatusUpdate( + receiveLsn.toInt64, + flushLsn.toInt64, + applyLsn.toInt64, + currentPgTimestamp(), + if replyRequested: 1'u8 else: 0'u8, + ) + await conn.sendMsg(msg) + proc sendStandbyStatus*( conn: PgConnection, receiveLsn: Lsn, @@ -727,6 +745,14 @@ proc sendStandbyStatus*( ): Future[void] {.async.} = ## Send a Standby Status Update to the server during replication streaming. ## Must be called while the connection is in ``csReplicating`` state. + ## + ## When ``flushLsn``/``applyLsn`` are left at ``InvalidLsn`` (``0/0``) they + ## default *up to* ``receiveLsn`` — convenient for callers that ACK received + ## data eagerly. Pass an explicit ``flushLsn``/``applyLsn`` to report a + ## position behind ``receiveLsn`` (e.g. only what the callback has durably + ## flushed). The automatic keepalive reply does not use this proc; it sends + ## the confirmed-flush position verbatim via an internal path so it never + ## inflates flush to merely-received WAL. if conn.state != csReplicating: raise newException( PgConnectionError, @@ -735,14 +761,117 @@ proc sendStandbyStatus*( ) let flushVal = if flushLsn == InvalidLsn: receiveLsn else: flushLsn let applyVal = if applyLsn == InvalidLsn: receiveLsn else: applyLsn - let msg = encodeStandbyStatusUpdate( - receiveLsn.toInt64, - flushVal.toInt64, - applyVal.toInt64, - currentPgTimestamp(), - if replyRequested: 1'u8 else: 0'u8, + await conn.sendStandbyStatusRaw(receiveLsn, flushVal, applyVal, replyRequested) + +proc confirmedFlushLsn*(conn: PgConnection): Lsn {.inline.} = + ## Highest LSN the application has confirmed durably flushed for the current + ## replication stream via ``confirmFlushed``. Initialised to the stream's + ## ``startLsn`` by ``startReplication`` / ``startPhysicalReplication``; this is + ## the flush/apply position carried by automatic keepalive replies. + ## + ## Only meaningful during an active stream: outside ``csReplicating`` (before a + ## stream starts or after it ends) this returns ``InvalidLsn`` (``0/0``) rather + ## than a stale value left over from a previous stream. + if conn.state != csReplicating: + return InvalidLsn + Lsn(conn.replConfirmedFlushLsn()) + +proc confirmFlushed*(conn: PgConnection, lsn: Lsn): bool = + ## Record that received WAL up to and including ``lsn`` has been durably + ## persisted by the application, so automatic keepalive replies (see + ## ``autoKeepaliveReply`` on ``startReplication``) report it as the flush/apply + ## position and let the server advance ``confirmed_flush_lsn`` and recycle WAL. + ## + ## Call this from the replication callback *after* the received changes are + ## durable. Until you do, the automatic reply acknowledges only *receipt* (the + ## receive LSN), never flush — so a crash re-streams the unprocessed WAL, + ## giving at-least-once delivery. Calls that would move the confirmed position + ## backwards are ignored, so duplicate or out-of-order confirmations are safe. + ## + ## ``lsn`` is clamped to the WAL actually received: you cannot have durably + ## persisted WAL you have not yet received, so an ``lsn`` beyond the highest + ## ``XLogData.receivedEndLsn`` observed confirms only up to that received + ## position (passing ``walEnd`` — which runs ahead of the data this message + ## carries — therefore confirms received WAL rather than over-advancing). + ## Because of this clamp the confirmed position can never exceed received WAL, + ## so automatic replies never emit a flush ahead of receive, and the call never + ## raises on an out-of-range LSN (an uncaught raise from the callback would + ## strand the stream). Must be called while the connection is ``csReplicating`` + ## (i.e. from the replication callback); calling it outside an active stream + ## raises ``PgConnectionError``. + ## + ## Returns ``true`` when the confirmed-flush position actually moved forward + ## (after clamping and the monotonic guard). ``false`` means the request was + ## ignored because it was behind the current confirmed position. + if conn.state != csReplicating: + raise newException( + PgConnectionError, + "confirmFlushed: connection is not in replicating state (state: " & $conn.state & + ")", + ) + # Clamp to received WAL: durably-persisted WAL can never exceed what was + # received. Clamping (rather than raising) keeps automatic replies from + # emitting flush ahead of receive without letting an out-of-range LSN — e.g. + # the readily-available ``walEnd`` — throw out of the callback and strand the + # connection in ``csReplicating``. The raw helper in pg_connection/types + # performs the clamp and the monotonic advance in one place. + return conn.confirmReplFlushed(lsn.toUInt64) + +proc sendConfirmedStatus(conn: PgConnection, receiveLsn: Lsn): Future[void] {.async.} = + ## Send a Standby Status Update carrying ``receiveLsn`` in the *receive* field + ## (which resets ``wal_sender_timeout`` on the server) and the + ## ``confirmFlushed`` position in flush/apply. The confirmed position is sent + ## verbatim — it is the stream's ``startLsn`` until ``confirmFlushed`` advances + ## it, so when nothing has been confirmed and ``startLsn`` was left at its + ## default ``InvalidLsn`` it is ``0/0``, which PostgreSQL reads as "position + ## unknown" and will not move the slot backwards. Either way flush never + ## advances past WAL the callback has not yet confirmed durable. Used by the + ## automatic keepalive reply and by ``stopReplication``. + ## + ## Only valid while ``csReplicating``, where ``confirmedFlushLsn`` is bounded + ## by received WAL (see ``confirmFlushed``), so flush never exceeds receive. + ## Calling this outside an active replication stream raises ``PgConnectionError``. + if conn.state != csReplicating: + raise newException( + PgConnectionError, + "sendConfirmedStatus: connection is not in replicating state (state: " & + $conn.state & ")", + ) + let flushLsn = conn.confirmedFlushLsn + await conn.sendStandbyStatusRaw( + receiveLsn, flushLsn, flushLsn, replyRequested = false ) - await conn.sendMsg(msg) + +proc resetReplLsnTracking(conn: PgConnection, startLsn: Lsn) = + ## Reset the per-stream confirmed-flush and max-received positions to the + ## resume point at the start of a stream, so a reused connection never inherits + ## a stale value from a previous stream. The confirmed-flush position then + ## advances only via ``confirmFlushed``; the max-received position advances as + ## ``XLogData`` arrives and bounds what ``confirmFlushed`` will accept. + conn.initReplLsnTracking(startLsn.toUInt64) + +proc handleReplicationData( + conn: PgConnection, + copyData: seq[byte], + autoKeepaliveReply: bool, + callback: ReplicationCallback, +): Future[void] {.async.} = + ## Process one CopyData frame from a replication stream: parse it, advance the + ## received-WAL position on ``XLogData`` (the single source of truth read by + ## ``confirmFlushed`` and the auto-reply), emit an automatic keepalive reply on + ## a ``PrimaryKeepalive`` with ``replyRequested`` when ``autoKeepaliveReply`` is + ## set, then invoke the user ``callback``. Shared by ``startReplication`` and + ## ``startPhysicalReplication`` so the received-tracking and auto-reply logic + ## lives in exactly one place. + let replMsg = parseReplicationMessage(copyData) + case replMsg.kind + of rmkXLogData: + let received = replMsg.xlogData.receivedEndLsn + discard conn.updateReplMaxReceivedLsn(received.toUInt64) + of rmkPrimaryKeepalive: + if autoKeepaliveReply and replMsg.keepalive.replyRequested: + await sendConfirmedStatus(conn, Lsn(conn.replMaxReceivedLsn())) + await callback(replMsg) proc startReplication*( conn: PgConnection, @@ -756,25 +885,45 @@ proc startReplication*( ## ## The ``callback`` is invoked for each ``XLogData`` or ``PrimaryKeepalive`` ## message received. The callback is awaited, providing natural TCP backpressure. - ## Within the callback, use ``sendStandbyStatus`` to acknowledge received data. + ## From the callback, acknowledge durable progress with ``confirmFlushed`` (the + ## default path; see below). With ``autoKeepaliveReply = false`` you instead + ## drive replies yourself with ``sendStandbyStatus``; do not mix the two, since + ## the auto-reply would report a flush position behind your manual ACKs. ## ## When ``autoKeepaliveReply`` is true (the default), the library responds ## automatically to ``PrimaryKeepalive`` messages with ``replyRequested = true`` - ## *before* invoking the callback, sending the highest ``receivedEndLsn`` - ## (``startLsn + data.len``) observed so far across received ``XLogData`` - ## messages — or the caller-supplied ``startLsn`` if no ``XLogData`` has - ## arrived yet — as receive/flush/apply LSN. This prevents silent disconnects - ## from ``wal_sender_timeout`` when the callback is slow. The keepalive is - ## still delivered to the callback. Set ``autoKeepaliveReply = false`` to - ## manage replies manually — for example, when the flush/apply LSN must - ## reflect callback-side progress (durable writes) rather than what has merely - ## been received from the wire. + ## *before* invoking the callback. The reply reports the highest + ## ``receivedEndLsn`` (``startLsn + data.len``) observed so far across received + ## ``XLogData`` messages — or the caller-supplied ``startLsn`` if no + ## ``XLogData`` has arrived yet — as the **receive** LSN, which resets + ## ``wal_sender_timeout`` and prevents silent disconnects when the callback is + ## slow. The **flush/apply** LSN, however, carries only what you have confirmed + ## durable via ``confirmFlushed`` (initially ``startLsn``), *not* the receive + ## LSN. This keeps ``confirmed_flush_lsn`` from advancing past WAL the callback + ## has not yet persisted, so a crash re-streams unprocessed changes + ## (at-least-once delivery). The keepalive is still delivered to the callback. ## - ## If no ``XLogData`` has arrived and ``startLsn`` was left at its default - ## ``InvalidLsn`` (``0/0``), the auto-reply will carry ``0/0`` for - ## receive/flush/apply. PostgreSQL treats this as "position unknown" and will - ## not move ``confirmed_flush_lsn`` backwards, so the reply is still useful - ## for resetting ``wal_sender_timeout`` without risking data loss. + ## To advance the slot, call ``confirmFlushed(conn, lsn)`` from the callback + ## once the received changes are durable. The confirmed position reaches the + ## server on the next reply-requested keepalive and on ``stopReplication`` (a + ## clean stop flushes it), not on the ``confirmFlushed`` call itself. Set + ## ``autoKeepaliveReply = false`` to manage replies entirely by hand with + ## ``sendStandbyStatus`` instead — for example, to batch acknowledgements or + ## report apply separately from flush. + ## + ## Until ``confirmFlushed`` is called and while ``startLsn`` is at its default + ## ``InvalidLsn`` (``0/0``), the auto-reply carries ``0/0`` for flush/apply. + ## PostgreSQL treats this as "position unknown" and will not move + ## ``confirmed_flush_lsn`` backwards, so the reply is still useful for resetting + ## ``wal_sender_timeout`` without risking data loss. + ## + ## **Synchronous standbys:** because the auto-reply reports receive and + ## flush/apply separately, a consumer listed in ``synchronous_standby_names`` + ## (with ``synchronous_commit`` at ``on``/``remote_write``/``remote_apply``) + ## that never calls ``confirmFlushed`` keeps ``wal_sender_timeout`` reset via + ## the receive field yet never advances flush — so the primary's ``COMMIT``s + ## block indefinitely waiting for a flush confirmation that never arrives. + ## Call ``confirmFlushed`` promptly (or manage replies manually) in that setup. ## ## If the auto-reply itself fails (for example, the connection is lost ## between receiving the keepalive and writing the Standby Status Update), @@ -856,13 +1005,16 @@ proc startReplication*( discard await conn.fillRecvBuf() - # Highest end LSN of WAL data actually received from the wire — computed as - # XLogData.startLsn + data.len, *not* XLogData.walEnd. ``walEnd`` is the - # server's current WAL position at the time the message was sent and can be - # ahead of the bytes this message carries; acknowledging ``walEnd`` would + # Track the highest end LSN of WAL data actually received from the wire — + # computed as XLogData.startLsn + data.len, *not* XLogData.walEnd. ``walEnd`` + # is the server's current WAL position at the time the message was sent and can + # be ahead of the bytes this message carries; acknowledging ``walEnd`` would # falsely advance ``confirmed_flush_lsn`` past unprocessed WAL and cause data - # loss on slot restart. - var lastReceivedLsn: Lsn = startLsn + # loss on slot restart. The position lives on the connection (the single source + # of truth, read by confirmFlushed and the auto-reply); reset it and the + # confirmed-flush position to the resume point so a reused connection does not + # inherit a stale value. + conn.resetReplLsnTracking(startLsn) # Streaming loop block recvLoop: @@ -871,16 +1023,7 @@ proc startReplication*( let msg = opt.get case msg.kind of bmkCopyData: - let replMsg = parseReplicationMessage(msg.copyData) - case replMsg.kind - of rmkXLogData: - let received = replMsg.xlogData.receivedEndLsn - if received > lastReceivedLsn: - lastReceivedLsn = received - of rmkPrimaryKeepalive: - if autoKeepaliveReply and replMsg.keepalive.replyRequested: - await sendStandbyStatus(conn, lastReceivedLsn) - await callback(replMsg) + await conn.handleReplicationData(msg.copyData, autoKeepaliveReply, callback) of bmkCopyDone: # Server ended the stream; reply with CopyDone before draining await conn.sendMsg(@copyDoneMsg) @@ -918,15 +1061,33 @@ proc startReplication*( await conn.fillRecvBuf() proc stopReplication*(conn: PgConnection): Future[void] {.async.} = - ## Send CopyDone to gracefully terminate the replication stream. - ## The server will respond with CopyDone and ReadyForQuery, which - ## will be handled by the ``startReplication`` recv loop. + ## Gracefully terminate the replication stream. + ## + ## Before sending CopyDone, this flushes the latest ``confirmFlushed`` position + ## to the server (receive = highest WAL received, flush/apply = confirmed) so a + ## clean shutdown does not lose the final acknowledgement. ``confirmFlushed`` + ## only records locally; without this flush the confirmed position would reach + ## the server only on the next ``PrimaryKeepalive(replyRequested)``, which may + ## never arrive before stop — leaving the slot behind and re-streaming the last + ## batch on restart. When nothing has been confirmed the flush is the stream's + ## ``startLsn`` (``0/0`` only when ``startLsn`` was left at its default + ## ``InvalidLsn``, which PostgreSQL reads as "position unknown" and will not + ## move the slot backwards), so manual-ACK callers are unaffected. + ## + ## The server responds with CopyDone and ReadyForQuery, which are handled by + ## the ``startReplication`` recv loop. + ## + ## If flushing the confirmed position fails (for example because the + ## connection is already lost), the exception propagates and CopyDone is not + ## sent. In that situation the server has already dropped the connection, so + ## the missing CopyDone does not change the outcome. if conn.state != csReplicating: raise newException( PgConnectionError, "stopReplication: connection is not in replicating state (state: " & $conn.state & ")", ) + await sendConfirmedStatus(conn, Lsn(conn.replMaxReceivedLsn())) await conn.sendMsg(@copyDoneMsg) proc startPhysicalReplication*( @@ -950,8 +1111,15 @@ proc startPhysicalReplication*( ## pgoutput decoding applies. ## ## ``autoKeepaliveReply`` behaves identically to ``startReplication``: when - ## true, ``PrimaryKeepalive(replyRequested=true)`` is acknowledged with the - ## highest observed ``receivedEndLsn`` before the callback is invoked. + ## true, ``PrimaryKeepalive(replyRequested=true)`` is answered before the + ## callback runs, reporting the highest observed ``receivedEndLsn`` as the + ## receive LSN and the ``confirmFlushed`` position (initially ``startLsn``) as + ## flush/apply. For physical replication the flush LSN governs how much WAL the + ## primary may recycle, so call ``confirmFlushed`` only once that WAL is safely + ## on durable storage. A physical standby listed in ``synchronous_standby_names`` + ## that relies on the auto-reply must likewise call ``confirmFlushed`` (or reply + ## manually), or the primary's synchronous ``COMMIT``s will block waiting for a + ## flush position that never advances. ## ## On a timeline switch the server may send a final result set describing ## the next timeline (``RowDescription`` + ``DataRow`` + ``CommandComplete``) @@ -998,8 +1166,10 @@ proc startPhysicalReplication*( await conn.fillRecvBuf() # Highest end LSN of WAL data actually received — see startReplication for - # why we track ``startLsn + data.len`` rather than ``walEnd``. - var lastReceivedLsn: Lsn = startLsn + # why we track ``startLsn + data.len`` rather than ``walEnd``, and why the + # position lives on the connection. Reset it and the confirmed-flush position + # to the resume point so a reused connection does not inherit a stale value. + conn.resetReplLsnTracking(startLsn) block recvLoop: while true: @@ -1007,16 +1177,7 @@ proc startPhysicalReplication*( let msg = opt.get case msg.kind of bmkCopyData: - let replMsg = parseReplicationMessage(msg.copyData) - case replMsg.kind - of rmkXLogData: - let received = replMsg.xlogData.receivedEndLsn - if received > lastReceivedLsn: - lastReceivedLsn = received - of rmkPrimaryKeepalive: - if autoKeepaliveReply and replMsg.keepalive.replyRequested: - await sendStandbyStatus(conn, lastReceivedLsn) - await callback(replMsg) + await conn.handleReplicationData(msg.copyData, autoKeepaliveReply, callback) of bmkCopyDone: await conn.sendMsg(@copyDoneMsg) break recvLoop diff --git a/examples/replication.nim b/examples/replication.nim index a97f3dc..bc6f378 100644 --- a/examples/replication.nim +++ b/examples/replication.nim @@ -81,17 +81,30 @@ proc main() {.async.} = else: echo "Other: ", pgMsg.kind - # Acknowledge progress. Use receivedEndLsn (startLsn + data.len), not - # walEnd: walEnd is the server's current WAL position and may point past - # what this message actually carries. - await replConn.sendStandbyStatus(msg.xlogData.receivedEndLsn) + # Mark the received changes durable. confirmFlushed only records the + # flush/apply position locally; the automatic keepalive reply + # (autoKeepaliveReply, default) carries it to the server on the next + # reply-requested keepalive, and stopReplication flushes it on a clean + # stop — together advancing the slot's confirmed_flush_lsn. Call it only + # *after* the change is durably processed (here, echoed above). Use + # receivedEndLsn (startLsn + data.len), not walEnd: walEnd is the server's + # current WAL position and may point past what this message actually + # carries. + # + # To advance the slot eagerly on every message instead, call + # `await replConn.sendStandbyStatus(msg.xlogData.receivedEndLsn)` here and + # pass `autoKeepaliveReply = false` to startReplication — otherwise the + # auto-reply would report a flush position behind your manual ACKs. + discard replConn.confirmFlushed(msg.xlogData.receivedEndLsn) inc msgCount if msgCount >= 10: await replConn.stopReplication() of rmkPrimaryKeepalive: - # startReplication's autoKeepaliveReply (default) already responds with - # the highest receivedEndLsn observed so far. No manual reply needed. + # startReplication's autoKeepaliveReply (default) answers keepalives for + # us: the receive LSN is the highest receivedEndLsn observed, while + # flush/apply reflect the confirmFlushed position set above. No manual + # reply needed. discard echo "Starting replication from ", slot.consistentPoint diff --git a/tests/mock_pg_server.nim b/tests/mock_pg_server.nim index 0337a87..592f915 100644 --- a/tests/mock_pg_server.nim +++ b/tests/mock_pg_server.nim @@ -17,6 +17,9 @@ when hasAsyncDispatch: # Types and low-level transport +type AutoKeepaliveResult* = + tuple[msgType: char, receive: int64, flush: int64, apply: int64] + when hasChronos: type MockServer* = object @@ -223,6 +226,48 @@ proc buildErrorResponse*(sqlState, message: string): seq[byte] = body.add(0'u8) # field list terminator buildBackendMsg('E', body) +# Replication (CopyBothResponse / CopyData) builders and decoders, shared by the +# replication test suites. + +proc buildCopyBothResponse*(): seq[byte] = + ## CopyBothResponse: format(1=binary) + numCols(0). + var body: seq[byte] + body.add(1'u8) + body.addInt16(0'i16) + buildBackendMsg('W', body) + +proc buildCopyData*(payload: openArray[byte]): seq[byte] = + buildBackendMsg('d', payload) + +proc buildXLogData*(startLsn, walEnd, sendTime: int64, walData: seq[byte]): seq[byte] = + ## CopyData('w' + startLsn + walEnd + sendTime + walData). + var payload: seq[byte] + payload.add(byte('w')) + payload.addInt64(startLsn) + payload.addInt64(walEnd) + payload.addInt64(sendTime) + payload.add(walData) + buildCopyData(payload) + +proc buildKeepalive*(walEnd, sendTime: int64, replyRequested: bool): seq[byte] = + ## CopyData('k' + walEnd + sendTime + replyRequested). + var payload: seq[byte] + payload.add(byte('k')) + payload.addInt64(walEnd) + payload.addInt64(sendTime) + payload.add(if replyRequested: 1'u8 else: 0'u8) + buildCopyData(payload) + +proc buildCopyDone*(): seq[byte] = + @[byte('c'), 0'u8, 0'u8, 0'u8, 4'u8] + +proc decodeStandbyStatus*(body: seq[byte]): tuple[receive, flush, apply: int64] = + ## Frontend CopyData body for a Standby Status Update: + ## 'r' + receive(8) + flush(8) + apply(8) + clock(8) + replyRequested(1). + doAssert body.len == 1 + 8 + 8 + 8 + 8 + 1, "unexpected standby status size" + doAssert body[0] == byte('r'), "expected standby status type byte 'r'" + (decodeInt64(body, 1), decodeInt64(body, 9), decodeInt64(body, 17)) + # Frontend readers proc drainStartupMessage*(client: MockClient) {.async.} = @@ -244,6 +289,39 @@ proc drainFrontendMessage*( if msgLen > 4: result.body = await readN(client, msgLen - 4) +proc runAutoKeepaliveServer*( + client: MockClient, + startLsn, walEnd, keepaliveWalEnd: int64, + walData: seq[byte], + endStream: bool = true, +): Future[AutoKeepaliveResult] {.async.} = + ## Server-side helper for auto-keepalive reply tests. Drains the + ## START_REPLICATION query, sends CopyBothResponse + XLogData + + ## PrimaryKeepalive(replyRequested=true), captures the client's Standby Status + ## Update reply, and optionally ends the stream with CopyDone + ReadyForQuery + ## (consuming the client's CopyDone reply). Returns the decoded reply fields. + discard await drainFrontendMessage(client) # START_REPLICATION + var burst: seq[byte] + burst.add(buildCopyBothResponse()) + burst.add(buildXLogData(startLsn, walEnd, 0, walData)) + burst.add(buildKeepalive(keepaliveWalEnd, 0, replyRequested = true)) + await sendBytes(client, burst) + let reply = await drainFrontendMessage(client) + var observed: AutoKeepaliveResult + observed.msgType = reply.msgType + if reply.msgType == 'd': + let ssu = decodeStandbyStatus(reply.body) + observed.receive = ssu.receive + observed.flush = ssu.flush + observed.apply = ssu.apply + if endStream: + var tail: seq[byte] + tail.add(buildCopyDone()) + tail.add(buildReadyForQuery('I')) + await sendBytes(client, tail) + discard await drainFrontendMessage(client) # client's CopyDone + return observed + # Full handshake shortcut proc sendFullHandshake*( diff --git a/tests/test_e2e.nim b/tests/test_e2e.nim index cd92df6..9bc29ea 100644 --- a/tests/test_e2e.nim +++ b/tests/test_e2e.nim @@ -10225,7 +10225,9 @@ suite "E2E: Logical Replication": else: discard of rmkPrimaryKeepalive: - # autoKeepaliveReply (default) responds with receivedEndLsn. + # autoKeepaliveReply (default) already replied: receivedEndLsn in the + # receive field (resets wal_sender_timeout); flush/apply track the + # confirmFlushed position. No manual reply needed. discard # Insert a row from the writer connection after a short delay @@ -10276,8 +10278,9 @@ suite "E2E: Logical Replication": of rmkXLogData: await replConn.sendStandbyStatus(msg.xlogData.receivedEndLsn) of rmkPrimaryKeepalive: - # Stop immediately on first keepalive. - # autoKeepaliveReply (default) handles the ACK with receivedEndLsn. + # Stop immediately on first keepalive. autoKeepaliveReply (default) + # already replied (receive = receivedEndLsn); flush/apply track the + # confirmFlushed position. await replConn.stopReplication() await replConn.startReplication( diff --git a/tests/test_physical_replication.nim b/tests/test_physical_replication.nim index 29d86c9..9f2790f 100644 --- a/tests/test_physical_replication.nim +++ b/tests/test_physical_replication.nim @@ -19,27 +19,6 @@ proc mockConfig(port: int): ConnConfig = host: "127.0.0.1", port: port, user: "test", database: "test", sslMode: sslDisable ) -proc buildCopyBothResponse(): seq[byte] = - var body: seq[byte] - body.add(1'u8) - body.addInt16(0'i16) - buildBackendMsg('W', body) - -proc buildCopyData(payload: openArray[byte]): seq[byte] = - buildBackendMsg('d', payload) - -proc buildXLogData(startLsn, walEnd, sendTime: int64, walData: seq[byte]): seq[byte] = - var payload: seq[byte] - payload.add(byte('w')) - payload.addInt64(startLsn) - payload.addInt64(walEnd) - payload.addInt64(sendTime) - payload.add(walData) - buildCopyData(payload) - -proc buildCopyDone(): seq[byte] = - @[byte('c'), 0'u8, 0'u8, 0'u8, 4'u8] - proc readStartupMessage(client: MockClient): Future[seq[byte]] {.async.} = ## Same as drainStartupMessage but returns the body so tests can inspect ## the startup parameters. @@ -69,6 +48,10 @@ var capturedRaised: bool = false var capturedFinalState: PgConnState = csClosed var capturedFilename: string = "" var capturedContent: seq[byte] +var physObservedReceiveLsn: int64 = -1 +var physObservedFlushLsn: int64 = -1 +var physObservedApplyLsn: int64 = -1 +var physObservedReplyMsgType: char = '\0' suite "Physical replication: SQL building": proc runOneSqlTest( @@ -235,6 +218,59 @@ suite "sendCopyData public API": waitFor testBody() check capturedRaised +suite "Physical replication: auto keepalive reply": + test "auto-reply uses receivedEndLsn and confirmedFlushLsn": + physObservedReceiveLsn = -1 + physObservedFlushLsn = -1 + physObservedApplyLsn = -1 + physObservedReplyMsgType = '\0' + capturedReceivedKinds.setLen(0) + + const + physStartLsn = 0x0000_0000_0000_1000'i64 + physWalData: seq[byte] = @[1'u8, 2, 3] + physReceivedEndLsn = physStartLsn + physWalData.len + physWalEnd = 0x0000_0000_0000_5000'i64 + physKeepaliveWalEnd = 0x0000_0000_0000_9999'i64 + + proc testBody() {.async.} = + let ms = startMockServer() + + proc serverHandler() {.async.} = + let st = await acceptAndReady(ms) + let ssu = await runAutoKeepaliveServer( + st, physStartLsn, physWalEnd, physKeepaliveWalEnd, physWalData + ) + physObservedReplyMsgType = ssu.msgType + physObservedReceiveLsn = ssu.receive + physObservedFlushLsn = ssu.flush + physObservedApplyLsn = ssu.apply + await closeClient(st) + + let serverFut = serverHandler() + let conn = await connect(mockConfig(ms.port)) + let cb = makeReplicationCallback: + {.cast(gcsafe).}: + capturedReceivedKinds.add(msg.kind) + if msg.kind == rmkXLogData: + discard conn.confirmFlushed(msg.xlogData.receivedEndLsn) + + await conn.startPhysicalReplication( + startLsn = Lsn(uint64(physStartLsn)), callback = cb + ) + await conn.close() + await serverFut + await closeServer(ms) + + waitFor testBody() + check physObservedReplyMsgType == 'd' + check physObservedReceiveLsn == physReceivedEndLsn + check physObservedReceiveLsn != physWalEnd + check physObservedReceiveLsn != physKeepaliveWalEnd + check physObservedFlushLsn == physReceivedEndLsn + check physObservedApplyLsn == physReceivedEndLsn + check capturedReceivedKinds == @[rmkXLogData, rmkPrimaryKeepalive] + suite "connectReplication mode": test "rmPhysical sends replication=true in StartupMessage": capturedStartupBody.setLen(0) diff --git a/tests/test_replication_keepalive.nim b/tests/test_replication_keepalive.nim index d42e0f7..39bb703 100644 --- a/tests/test_replication_keepalive.nim +++ b/tests/test_replication_keepalive.nim @@ -2,14 +2,16 @@ ## ## Verifies that when `startReplication` is invoked with `autoKeepaliveReply = true` ## (the default), the library responds to `PrimaryKeepalive(replyRequested=true)` -## messages automatically, using the highest `receivedEndLsn` -## (`XLogData.startLsn + data.len`) observed so far — never the server's -## `walEnd` (neither the keepalive's nor the XLogData's). Also verifies the -## opt-out path. +## messages automatically, reporting the highest `receivedEndLsn` +## (`XLogData.startLsn + data.len`) observed so far in the *receive* field — +## never the server's `walEnd` (neither the keepalive's nor the XLogData's). +## Also verifies that flush/apply only reflect the LSN confirmed durable via +## `confirmFlushed` (so merely-received WAL does not advance +## `confirmed_flush_lsn`, preserving at-least-once delivery) and the opt-out path. import std/unittest -import ../async_postgres/[async_backend, pg_protocol, pg_replication] +import ../async_postgres/[async_backend, pg_replication] import ../async_postgres/pg_connection {.all.} import ./mock_pg_server @@ -19,44 +21,6 @@ proc mockConfig(port: int): ConnConfig = host: "127.0.0.1", port: port, user: "test", database: "test", sslMode: sslDisable ) -proc buildCopyBothResponse(): seq[byte] = - ## CopyBothResponse: format(1=binary) + numCols(0). - var body: seq[byte] - body.add(1'u8) - body.addInt16(0'i16) - buildBackendMsg('W', body) - -proc buildCopyData(payload: openArray[byte]): seq[byte] = - buildBackendMsg('d', payload) - -proc buildXLogData(startLsn, walEnd, sendTime: int64, walData: seq[byte]): seq[byte] = - ## CopyData('w' + startLsn + walEnd + sendTime + walData). - var payload: seq[byte] - payload.add(byte('w')) - payload.addInt64(startLsn) - payload.addInt64(walEnd) - payload.addInt64(sendTime) - payload.add(walData) - buildCopyData(payload) - -proc buildKeepalive(walEnd, sendTime: int64, replyRequested: bool): seq[byte] = - ## CopyData('k' + walEnd + sendTime + replyRequested). - var payload: seq[byte] - payload.add(byte('k')) - payload.addInt64(walEnd) - payload.addInt64(sendTime) - payload.add(if replyRequested: 1'u8 else: 0'u8) - buildCopyData(payload) - -proc buildCopyDone(): seq[byte] = - @[byte('c'), 0'u8, 0'u8, 0'u8, 4'u8] - -proc decodeStandbyStatusReceiveLsn(body: seq[byte]): int64 = - ## Frontend CopyData body for a Standby Status Update: 'r' + receiveLsn(8) + ... - doAssert body.len == 1 + 8 + 8 + 8 + 8 + 1, "unexpected standby status size" - doAssert body[0] == byte('r'), "expected standby status type byte 'r'" - decodeInt64(body, 1) - const # startLsn of the XLogData burst the mock server sends. testStartLsn = 0x0000_0000_0000_1000'i64 @@ -73,6 +37,8 @@ const # Captured by the closure-typed `ReplicationCallback`. Kept at module scope so # the callback body can mutate state without forcing a non-gcsafe seq capture. var observedReceiveLsn: int64 = -1 +var observedFlushLsn: int64 = -1 +var observedApplyLsn: int64 = -1 var observedReplyMsgType: char = '\0' var callbackKinds: seq[ReplicationMessageKind] var keepaliveSeen: bool @@ -81,6 +47,8 @@ var unexpectedFrontendMsgType: char = '\0' suite "Replication: auto keepalive reply": test "auto-reply uses receivedEndLsn (startLsn+data.len), not any walEnd": observedReceiveLsn = -1 + observedFlushLsn = -1 + observedApplyLsn = -1 observedReplyMsgType = '\0' callbackKinds.setLen(0) @@ -89,28 +57,16 @@ suite "Replication: auto keepalive reply": proc serverHandler() {.async.} = let st = await acceptAndReady(ms) - # Drain START_REPLICATION query. - discard await drainFrontendMessage(st) - # Send CopyBothResponse + XLogData + Keepalive(replyRequested=1). - # XLogData.walEnd is deliberately set far ahead of startLsn+data.len - # so the test would fail if the client incorrectly acknowledged it. - var burst: seq[byte] - burst.add(buildCopyBothResponse()) - burst.add(buildXLogData(testStartLsn, testXLogWalEnd, 0, testWalData)) - burst.add(buildKeepalive(testKeepaliveWalEnd, 0, replyRequested = true)) - await sendBytes(st, burst) - # Expect the client to auto-reply with a Standby Status Update. - let reply = await drainFrontendMessage(st) - observedReplyMsgType = reply.msgType - if reply.msgType == 'd': - observedReceiveLsn = decodeStandbyStatusReceiveLsn(reply.body) - # End the stream cleanly. - var tail: seq[byte] - tail.add(buildCopyDone()) - tail.add(buildReadyForQuery('I')) - await sendBytes(st, tail) - # Client will reply with CopyDone before draining; consume it. - discard await drainFrontendMessage(st) + # Send CopyBothResponse + XLogData + Keepalive(replyRequested=1), capture + # the auto-reply, and end the stream cleanly. XLogData.walEnd is set far + # ahead of startLsn+data.len so the test fails if it is acknowledged. + let ssu = await runAutoKeepaliveServer( + st, testStartLsn, testXLogWalEnd, testKeepaliveWalEnd, testWalData + ) + observedReplyMsgType = ssu.msgType + observedReceiveLsn = ssu.receive + observedFlushLsn = ssu.flush + observedApplyLsn = ssu.apply await closeClient(st) let serverFut = serverHandler() @@ -130,6 +86,12 @@ suite "Replication: auto keepalive reply": # Regression guard: must not be either walEnd value. check observedReceiveLsn != testXLogWalEnd check observedReceiveLsn != testKeepaliveWalEnd + # Regression: flush/apply must NOT advance to merely-received WAL. + # No confirmFlushed was called and startLsn defaulted to 0/0, so the auto + # reply must report 0/0 for flush/apply — PostgreSQL reads it as "position + # unknown" and will not move confirmed_flush_lsn past unprocessed WAL. + check observedFlushLsn == 0 + check observedApplyLsn == 0 check callbackKinds == @[rmkXLogData, rmkPrimaryKeepalive] test "auto-reply disabled: library does not send Standby Status": @@ -177,3 +139,194 @@ suite "Replication: auto keepalive reply": waitFor testBody() check keepaliveSeen check unexpectedFrontendMsgType == '\0' + + test "confirmFlushed advances the auto-reply flush/apply LSN": + observedReceiveLsn = -1 + observedFlushLsn = -1 + observedApplyLsn = -1 + observedReplyMsgType = '\0' + callbackKinds.setLen(0) + + proc testBody() {.async.} = + let ms = startMockServer() + + proc serverHandler() {.async.} = + let st = await acceptAndReady(ms) + # XLogData then a keepalive(replyRequested). The client confirms only + # part of the received range as durable (startLsn, deliberately below the + # received end LSN) in its XLogData callback, so the auto-reply to the + # following keepalive must carry that confirmed LSN as flush/apply while + # still reporting the full received LSN as receive. + let ssu = await runAutoKeepaliveServer( + st, testStartLsn, testXLogWalEnd, testKeepaliveWalEnd, testWalData + ) + observedReplyMsgType = ssu.msgType + observedReceiveLsn = ssu.receive + observedFlushLsn = ssu.flush + observedApplyLsn = ssu.apply + await closeClient(st) + + let serverFut = serverHandler() + let conn = await connect(mockConfig(ms.port)) + let cb = makeReplicationCallback: + {.cast(gcsafe).}: + callbackKinds.add(msg.kind) + if msg.kind == rmkXLogData: + # Confirm durability only up to startLsn — deliberately *below* the + # received end LSN — so the auto-reply's flush/apply differ from its + # receive field and we prove they track confirmFlushed, not receipt. + discard conn.confirmFlushed(msg.xlogData.startLsn) + + await conn.startReplication("test_slot", callback = cb) + await conn.close() + await serverFut + await closeServer(ms) + + waitFor testBody() + check observedReplyMsgType == 'd' + # The receive field still reports the full received LSN (this resets + # wal_sender_timeout on the server). + check observedReceiveLsn == testReceivedEndLsn + # flush/apply report only the lower LSN confirmed durable via confirmFlushed + # (testStartLsn), proving they track the confirmed position independently of + # receive — not merely echo the received LSN as the removed flush=receive + # behavior did. This also makes the apply assertion meaningful: apply follows + # flush, not the (larger) receive LSN. + check observedFlushLsn == testStartLsn + check observedApplyLsn == testStartLsn + check callbackKinds == @[rmkXLogData, rmkPrimaryKeepalive] + + test "confirmFlushed ignores backward and duplicate confirmations": + observedFlushLsn = -1 + observedApplyLsn = -1 + observedReplyMsgType = '\0' + callbackKinds.setLen(0) + + proc testBody() {.async.} = + let ms = startMockServer() + + proc serverHandler() {.async.} = + let st = await acceptAndReady(ms) + let ssu = await runAutoKeepaliveServer( + st, testStartLsn, testXLogWalEnd, testKeepaliveWalEnd, testWalData + ) + observedReplyMsgType = ssu.msgType + observedFlushLsn = ssu.flush + observedApplyLsn = ssu.apply + await closeClient(st) + + let serverFut = serverHandler() + let conn = await connect(mockConfig(ms.port)) + let cb = makeReplicationCallback: + {.cast(gcsafe).}: + callbackKinds.add(msg.kind) + if msg.kind == rmkXLogData: + # Confirm the full received range, then attempt a duplicate and a + # backward confirmation. The confirmed position must stay at the + # highest valid value. + discard conn.confirmFlushed(msg.xlogData.receivedEndLsn) + discard conn.confirmFlushed(msg.xlogData.receivedEndLsn) + discard conn.confirmFlushed(msg.xlogData.startLsn) + + await conn.startReplication("test_slot", callback = cb) + await conn.close() + await serverFut + await closeServer(ms) + + waitFor testBody() + check observedReplyMsgType == 'd' + check observedFlushLsn == testReceivedEndLsn + check observedApplyLsn == testReceivedEndLsn + check callbackKinds == @[rmkXLogData, rmkPrimaryKeepalive] + + test "confirmFlushed clamps an LSN beyond the WAL actually received": + observedReceiveLsn = -1 + observedFlushLsn = -1 + observedApplyLsn = -1 + observedReplyMsgType = '\0' + callbackKinds.setLen(0) + + proc testBody() {.async.} = + let ms = startMockServer() + + proc serverHandler() {.async.} = + let st = await acceptAndReady(ms) + # The callback confirms one byte past received WAL; confirmFlushed clamps + # it to received and does NOT raise, so the auto-reply still fires. + let ssu = await runAutoKeepaliveServer( + st, testStartLsn, testXLogWalEnd, testKeepaliveWalEnd, testWalData + ) + observedReplyMsgType = ssu.msgType + observedReceiveLsn = ssu.receive + observedFlushLsn = ssu.flush + observedApplyLsn = ssu.apply + await closeClient(st) + + let serverFut = serverHandler() + let conn = await connect(mockConfig(ms.port)) + let cb = makeReplicationCallback: + {.cast(gcsafe).}: + callbackKinds.add(msg.kind) + if msg.kind == rmkXLogData: + # receivedEndLsn is testStartLsn + testWalData.len. Confirming one + # byte past that must clamp to received WAL — never advancing flush + # beyond it — and must not raise (an uncaught raise would strand the + # connection in csReplicating). + discard + conn.confirmFlushed(Lsn(uint64(msg.xlogData.receivedEndLsn) + 1'u64)) + + await conn.startReplication("test_slot", callback = cb) + await conn.close() + await serverFut + await closeServer(ms) + + waitFor testBody() + check observedReplyMsgType == 'd' + # receive still reports the full received LSN (resets wal_sender_timeout). + check observedReceiveLsn == testReceivedEndLsn + # The over-range confirmation was clamped to received WAL: flush/apply equal + # the received end LSN, never the (received + 1) value passed in. + check observedFlushLsn == testReceivedEndLsn + check observedApplyLsn == testReceivedEndLsn + check callbackKinds == @[rmkXLogData, rmkPrimaryKeepalive] + + test "confirmFlushed returns whether the position advanced": + var observedMsgType: char = '\0' + var firstAdvanced: bool = false + var secondAdvanced: bool = true + var thirdAdvanced: bool = true + + proc testBody() {.async.} = + let ms = startMockServer() + + proc serverHandler() {.async.} = + let st = await acceptAndReady(ms) + let ssu = await runAutoKeepaliveServer( + st, testStartLsn, testXLogWalEnd, testKeepaliveWalEnd, testWalData + ) + observedMsgType = ssu.msgType + await closeClient(st) + + let serverFut = serverHandler() + let conn = await connect(mockConfig(ms.port)) + let cb = makeReplicationCallback: + {.cast(gcsafe).}: + if msg.kind == rmkXLogData: + # First confirmation moves the position forward. + firstAdvanced = conn.confirmFlushed(msg.xlogData.startLsn) + # Second confirmation to the same LSN is a no-op. + secondAdvanced = conn.confirmFlushed(msg.xlogData.startLsn) + # Backward confirmation is also a no-op. + thirdAdvanced = + conn.confirmFlushed(Lsn(uint64(msg.xlogData.startLsn) - 1'u64)) + + await conn.startReplication("test_slot", callback = cb) + await conn.close() + await serverFut + await closeServer(ms) + + waitFor testBody() + check observedMsgType == 'd' + check firstAdvanced + check not secondAdvanced + check not thirdAdvanced