From aa48e31c5c6aa06dc264e243b28830117bd7a5f4 Mon Sep 17 00:00:00 2001 From: fox0430 Date: Wed, 17 Jun 2026 19:11:45 +0900 Subject: [PATCH 1/2] Add proactive statusInterval to replication streaming --- async_postgres/pg_replication.nim | 111 ++++++++++++++++++++++++++- tests/test_replication_keepalive.nim | 77 +++++++++++++++++++ 2 files changed, 186 insertions(+), 2 deletions(-) diff --git a/async_postgres/pg_replication.nim b/async_postgres/pg_replication.nim index 3d5beb8..83565e7 100644 --- a/async_postgres/pg_replication.nim +++ b/async_postgres/pg_replication.nim @@ -850,6 +850,69 @@ proc resetReplLsnTracking(conn: PgConnection, startLsn: Lsn) = ## ``XLogData`` arrives and bounds what ``confirmFlushed`` will accept. conn.initReplLsnTracking(startLsn.toUInt64) +proc replFillRecvBuf( + conn: PgConnection, statusInterval: async_backend.Duration, lastStatusSent: Moment +): Future[void] {.async.} = + ## Wait for more replication data, but wake early enough that the caller can + ## emit a proactive Standby Status Update when ``statusInterval`` is set. + ## + ## With ``statusInterval == ZeroDuration`` (the default) this blocks until data + ## arrives, exactly like a bare ``fillRecvBuf``. + ## + ## With a positive ``statusInterval`` under **chronos**, the read is bounded by + ## the time left until the next status update is due; on the resulting + ## ``AsyncTimeoutError`` the read is cleanly cancelled (no bytes consumed, so + ## the stream stays intact) and we return normally, letting the caller send the + ## update and resume. Under **asyncdispatch** a timed read cannot be cancelled — + ## the abandoned read would consume and drop bytes, desyncing the stream — so we + ## fall back to an unbounded read. The caller still emits status updates + ## opportunistically after each received message, which covers a busy stream + ## (where WAL actually accumulates); a fully idle asyncdispatch stream sends + ## nothing until the next message arrives. + if statusInterval <= ZeroDuration: + await conn.fillRecvBuf() + return + when hasChronos: + let sinceLast = Moment.now() - lastStatusSent + let remaining = + if sinceLast >= statusInterval: + async_backend.milliseconds(1) + else: + statusInterval - sinceLast + try: + await conn.fillRecvBuf(remaining) + except AsyncTimeoutError: + discard # woke to send a periodic status update, not a transport failure + else: + await conn.fillRecvBuf() + +proc maybeSendPeriodicStatus( + conn: PgConnection, + autoKeepaliveReply: bool, + statusInterval: async_backend.Duration, + lastStatusSent: Moment, +): Future[Moment] {.async.} = + ## Emit a proactive Standby Status Update if ``statusInterval`` has elapsed + ## since the last one, so ``confirmed_flush_lsn`` advances (and + ## ``wal_sender_timeout`` resets) even when the server never requests a reply — + ## e.g. a server configured with ``wal_sender_timeout = 0``. The update reports + ## the highest received LSN as receive and the ``confirmFlushed`` position as + ## flush/apply, identical to the automatic keepalive reply, so it never advances + ## flush past WAL the callback has confirmed durable. Returns the timestamp to + ## record as the new ``lastStatusSent`` (unchanged when nothing was sent). + ## + ## Only active together with ``autoKeepaliveReply``: under manual reply + ## management the caller owns the cadence and the reported LSNs via + ## ``sendStandbyStatus``. + if not autoKeepaliveReply or statusInterval <= ZeroDuration: + return lastStatusSent + if conn.state != csReplicating: + return lastStatusSent + if Moment.now() - lastStatusSent < statusInterval: + return lastStatusSent + await sendConfirmedStatus(conn, Lsn(conn.replMaxReceivedLsn())) + return Moment.now() + proc handleReplicationData( conn: PgConnection, copyData: seq[byte], @@ -879,6 +942,7 @@ proc startReplication*( startLsn: Lsn = InvalidLsn, options: seq[(string, string)] = @[], autoKeepaliveReply: bool = true, + statusInterval: async_backend.Duration = ZeroDuration, callback: ReplicationCallback, ): Future[void] {.async.} = ## Begin logical replication streaming from the given slot. @@ -925,6 +989,22 @@ proc startReplication*( ## block indefinitely waiting for a flush confirmation that never arrives. ## Call ``confirmFlushed`` promptly (or manage replies manually) in that setup. ## + ## **Proactive status interval:** ``statusInterval`` (``ZeroDuration`` = off, + ## the default) makes the library send a Standby Status Update on its own at + ## least that often, in addition to answering reply-requested keepalives. The + ## proactive update reports the highest received LSN as receive and the + ## ``confirmFlushed`` position as flush/apply — same as the auto-reply — so it + ## advances ``confirmed_flush_lsn`` (letting the server recycle WAL) without + ## ever flushing past unconfirmed WAL. Set it when the server uses + ## ``wal_sender_timeout = 0`` (or a long timeout): such a server never asks for + ## a reply, so without a proactive interval the slot only advances on + ## ``stopReplication`` and WAL accumulates meanwhile. It is honoured only when + ## ``autoKeepaliveReply`` is true; under manual reply management drive the + ## cadence yourself with ``sendStandbyStatus``. Under **asyncdispatch** the + ## interval only fires while messages are flowing (it cannot safely interrupt a + ## blocked read, so a fully idle stream sends nothing until the next message); + ## **chronos** honours it even on a completely idle stream. + ## ## If the auto-reply itself fails (for example, the connection is lost ## between receiving the keepalive and writing the Standby Status Update), ## the exception is propagated out of ``startReplication`` and the callback @@ -1015,6 +1095,7 @@ proc startReplication*( # confirmed-flush position to the resume point so a reused connection does not # inherit a stale value. conn.resetReplLsnTracking(startLsn) + var lastStatusSent = Moment.now() # Streaming loop block recvLoop: @@ -1038,9 +1119,18 @@ proc startReplication*( return else: discard + # Send a proactive status update if one is due after draining this batch + # (covers a busy stream on both backends). + lastStatusSent = await conn.maybeSendPeriodicStatus( + autoKeepaliveReply, statusInterval, lastStatusSent + ) if conn.state == csClosed: raise newException(PgConnectionError, "Connection closed during replication") - await conn.fillRecvBuf() + await conn.replFillRecvBuf(statusInterval, lastStatusSent) + # Send again after a timed wake with no new data (idle coverage on chronos). + lastStatusSent = await conn.maybeSendPeriodicStatus( + autoKeepaliveReply, statusInterval, lastStatusSent + ) # After CopyDone, drain to ReadyForQuery block drainLoop: @@ -1096,6 +1186,7 @@ proc startPhysicalReplication*( slotName: string = "", timeline: int32 = 0, autoKeepaliveReply: bool = true, + statusInterval: async_backend.Duration = ZeroDuration, callback: ReplicationCallback, ): Future[void] {.async.} = ## Begin **physical** replication streaming. @@ -1121,6 +1212,13 @@ proc startPhysicalReplication*( ## manually), or the primary's synchronous ``COMMIT``s will block waiting for a ## flush position that never advances. ## + ## ``statusInterval`` behaves as documented on ``startReplication``: a positive + ## value makes the standby send a proactive Standby Status Update at least that + ## often (receive = highest received, flush/apply = ``confirmFlushed``) so the + ## primary can recycle WAL even when it never requests a reply (e.g. + ## ``wal_sender_timeout = 0``); it is honoured only with ``autoKeepaliveReply``, + ## and on asyncdispatch only fires while messages are flowing. + ## ## On a timeline switch the server may send a final result set describing ## the next timeline (``RowDescription`` + ``DataRow`` + ``CommandComplete``) ## between ``CopyDone`` and ``ReadyForQuery``. This proc drains and discards @@ -1170,6 +1268,7 @@ proc startPhysicalReplication*( # position lives on the connection. Reset it and the confirmed-flush position # to the resume point so a reused connection does not inherit a stale value. conn.resetReplLsnTracking(startLsn) + var lastStatusSent = Moment.now() block recvLoop: while true: @@ -1191,11 +1290,19 @@ proc startPhysicalReplication*( return else: discard + # See startReplication: send a proactive status update when one is due, + # both after draining a batch and after a timed idle wake. + lastStatusSent = await conn.maybeSendPeriodicStatus( + autoKeepaliveReply, statusInterval, lastStatusSent + ) if conn.state == csClosed: raise newException( PgConnectionError, "Connection closed during physical replication" ) - await conn.fillRecvBuf() + await conn.replFillRecvBuf(statusInterval, lastStatusSent) + lastStatusSent = await conn.maybeSendPeriodicStatus( + autoKeepaliveReply, statusInterval, lastStatusSent + ) # After CopyDone, drain to ReadyForQuery. Accept the optional timeline-switch # result set (RowDescription + DataRow + CommandComplete) the server emits diff --git a/tests/test_replication_keepalive.nim b/tests/test_replication_keepalive.nim index 39bb703..585c820 100644 --- a/tests/test_replication_keepalive.nim +++ b/tests/test_replication_keepalive.nim @@ -330,3 +330,80 @@ suite "Replication: auto keepalive reply": check firstAdvanced check not secondAdvanced check not thirdAdvanced + +suite "Replication: proactive status interval": + test "statusInterval sends a Standby Status without a reply-requested keepalive": + # A server with wal_sender_timeout = 0 never requests a reply, so the slot + # only advances if the standby sends status updates on its own. With a + # positive statusInterval the library must emit a Standby Status Update + # (receive = received LSN, flush/apply = confirmFlushed) even though the + # server set replyRequested only never. Works on both backends: chronos via a + # timed idle wake, asyncdispatch via the post-message path nudged below. + observedReceiveLsn = -1 + observedFlushLsn = -1 + observedApplyLsn = -1 + observedReplyMsgType = '\0' + + proc testBody() {.async.} = + let ms = startMockServer() + + proc serverHandler() {.async.} = + let st = await acceptAndReady(ms) + discard await drainFrontendMessage(st) # START_REPLICATION + var burst: seq[byte] + burst.add(buildCopyBothResponse()) + # XLogData only — crucially, no PrimaryKeepalive(replyRequested=true). + burst.add(buildXLogData(testStartLsn, testXLogWalEnd, 0, testWalData)) + await sendBytes(st, burst) + # Let the status interval (50ms) elapse, then send a non-reply keepalive + # to unblock the asyncdispatch read (which cannot wake on a timer); + # chronos has already emitted updates on its own by now. + await sleepAsync(milliseconds(150)) + await sendBytes( + st, buildKeepalive(testKeepaliveWalEnd, 0, replyRequested = false) + ) + let reply = await drainFrontendMessage(st) + observedReplyMsgType = reply.msgType + if reply.msgType == 'd': + let ssu = decodeStandbyStatus(reply.body) + observedReceiveLsn = ssu.receive + observedFlushLsn = ssu.flush + observedApplyLsn = ssu.apply + # End the stream. Drain any further proactive updates plus the client's + # CopyDone so its CopyDone send sees an open socket. + var tail: seq[byte] + tail.add(buildCopyDone()) + tail.add(buildReadyForQuery('I')) + await sendBytes(st, tail) + while true: + let m = + try: + await drainFrontendMessage(st) + except CatchableError: + break + if m.msgType == 'c': # client's CopyDone + break + await closeClient(st) + + let serverFut = serverHandler() + let conn = await connect(mockConfig(ms.port)) + let cb = makeReplicationCallback: + {.cast(gcsafe).}: + if msg.kind == rmkXLogData: + discard conn.confirmFlushed(msg.xlogData.receivedEndLsn) + + await conn.startReplication( + "test_slot", statusInterval = milliseconds(50), callback = cb + ) + await conn.close() + await serverFut + await closeServer(ms) + + waitFor testBody() + # A proactive Standby Status Update arrived even though the server never set + # replyRequested: receive carries the received LSN, flush/apply the confirmed + # position (here equal, since the callback confirmed the full received range). + check observedReplyMsgType == 'd' + check observedReceiveLsn == testReceivedEndLsn + check observedFlushLsn == testReceivedEndLsn + check observedApplyLsn == testReceivedEndLsn From e77de2488abe9111c0f4cef32d037118fd29301d Mon Sep 17 00:00:00 2001 From: fox0430 Date: Wed, 17 Jun 2026 19:53:47 +0900 Subject: [PATCH 2/2] fix --- async_postgres/pg_connection/buffer_io.nim | 29 ++++++ async_postgres/pg_connection/types.nim | 6 ++ async_postgres/pg_replication.nim | 109 +++++++++++++++------ tests/test_physical_replication.nim | 81 +++++++++++++++ 4 files changed, 197 insertions(+), 28 deletions(-) diff --git a/async_postgres/pg_connection/buffer_io.nim b/async_postgres/pg_connection/buffer_io.nim index 40164a3..65a2479 100644 --- a/async_postgres/pg_connection/buffer_io.nim +++ b/async_postgres/pg_connection/buffer_io.nim @@ -267,6 +267,35 @@ proc fillRecvBuf*( conn.recvBuf.setLen(oldLen + data.len) copyMem(addr conn.recvBuf[oldLen], addr data[0], data.len) +when hasChronos: + proc fillRecvBufDetached*(conn: PgConnection): Future[void] {.async.} = + ## Read one chunk into a private scratch buffer and append it to ``recvBuf`` + ## only once the read settles, leaving ``recvBuf`` parseable while the read is + ## still in flight. + ## + ## ``fillRecvBuf`` grows ``recvBuf`` by ``RecvBufSize`` up front to hand the + ## chronos ``readOnce`` a destination pointer, so a caller that parses + ## ``recvBuf`` before the read completes would see uninitialised tail bytes. + ## The replication status-interval path keeps a single read pending across + ## timer wakes and parses between them, so it reads through here instead (see + ## ``replFillRecvBuf``). On any read failure the connection is marked + ## ``csClosed`` before re-raising, matching ``fillRecvBuf``. + if conn.replReadScratch.len < RecvBufSize: + conn.replReadScratch.setLen(RecvBufSize) + let n = + try: + await conn.reader.readOnce(addr conn.replReadScratch[0], RecvBufSize) + except CatchableError as e: + conn.state = csClosed + raise e + if n == 0: + conn.state = csClosed + raise newException(PgConnectionError, "Connection closed by server") + conn.compactRecvBuf() + let oldLen = conn.recvBuf.len + conn.recvBuf.setLen(oldLen + n) + copyMem(addr conn.recvBuf[oldLen], addr conn.replReadScratch[0], n) + proc nextMessage*( conn: PgConnection, rowData: RowData = nil, rowCount: ptr int32 = nil ): Option[BackendMessage] {.raises: [PgProtocolError].} = diff --git a/async_postgres/pg_connection/types.nim b/async_postgres/pg_connection/types.nim index e8bd0af..6587971 100644 --- a/async_postgres/pg_connection/types.nim +++ b/async_postgres/pg_connection/types.nim @@ -284,6 +284,12 @@ type ## by `startReplication` / `startPhysicalReplication`. Used by ## `confirmFlushed` to reject confirmations beyond received WAL. ## Manipulated via the exported helpers in this module (see below). + replReadScratch*: seq[byte] + ## Reusable scratch buffer for `fillRecvBufDetached` (chronos only). The + ## proactive status-interval path keeps a single read in flight across + ## timer wakes; reading into this private buffer (instead of growing + ## `recvBuf` up front like `fillRecvBuf`) keeps `recvBuf` parseable while + ## that read is still pending. Allocated lazily to `RecvBufSize` and reused. QueryResult* = object ## Result of a query: field descriptions, row data, and command tag. diff --git a/async_postgres/pg_replication.nim b/async_postgres/pg_replication.nim index 83565e7..af34087 100644 --- a/async_postgres/pg_replication.nim +++ b/async_postgres/pg_replication.nim @@ -851,40 +851,62 @@ proc resetReplLsnTracking(conn: PgConnection, startLsn: Lsn) = conn.initReplLsnTracking(startLsn.toUInt64) proc replFillRecvBuf( - conn: PgConnection, statusInterval: async_backend.Duration, lastStatusSent: Moment -): Future[void] {.async.} = + conn: PgConnection, + statusInterval: async_backend.Duration, + lastStatusSent: Moment, + pendingRead: Future[void], +): Future[Future[void]] {.async.} = ## Wait for more replication data, but wake early enough that the caller can ## emit a proactive Standby Status Update when ``statusInterval`` is set. ## + ## ``pendingRead`` carries a single in-flight read across calls (``nil`` when + ## none is outstanding). The updated read is returned: still pending after a + ## timed wake, or ``nil`` once it has been consumed. The caller threads the + ## returned value back in on the next call. + ## ## With ``statusInterval == ZeroDuration`` (the default) this blocks until data ## arrives, exactly like a bare ``fillRecvBuf``. ## - ## With a positive ``statusInterval`` under **chronos**, the read is bounded by - ## the time left until the next status update is due; on the resulting - ## ``AsyncTimeoutError`` the read is cleanly cancelled (no bytes consumed, so - ## the stream stays intact) and we return normally, letting the caller send the - ## update and resume. Under **asyncdispatch** a timed read cannot be cancelled — - ## the abandoned read would consume and drop bytes, desyncing the stream — so we - ## fall back to an unbounded read. The caller still emits status updates - ## opportunistically after each received message, which covers a busy stream - ## (where WAL actually accumulates); a fully idle asyncdispatch stream sends - ## nothing until the next message arrives. + ## With a positive ``statusInterval`` under **chronos**, a single background read + ## is raced against a timer sized to the time left until the next status update + ## is due. On a timer wake the read is **left in flight** (never cancelled) and + ## resumed on the next call: cancelling an in-flight transport read and then + ## starting another races chronos' asynchronous cancellation (see + ## ``RecvWatch.cancel``) and can surface as a "Read operation already pending" + ## ``AsyncStreamReadError``. ``fillRecvBufDetached`` commits its bytes to ``recvBuf`` + ## only when awaited, so a read that completes during the timed wait is neither + ## lost nor double-counted. Under **asyncdispatch** there is no timer-bounded + ## wake — a timed read cannot be cancelled, and the abandoned read would consume + ## and drop bytes, desyncing the stream — so it falls back to an unbounded read. + ## The caller still emits status updates opportunistically after each received + ## message, which covers a busy stream (where WAL actually accumulates); a fully + ## idle asyncdispatch stream sends nothing until the next message arrives. if statusInterval <= ZeroDuration: await conn.fillRecvBuf() - return + return nil when hasChronos: - let sinceLast = Moment.now() - lastStatusSent - let remaining = - if sinceLast >= statusInterval: - async_backend.milliseconds(1) - else: - statusInterval - sinceLast - try: - await conn.fillRecvBuf(remaining) - except AsyncTimeoutError: - discard # woke to send a periodic status update, not a transport failure + var read = pendingRead + if read == nil: + read = conn.fillRecvBufDetached() + if not read.finished: + let sinceLast = Moment.now() - lastStatusSent + let remaining = + if sinceLast >= statusInterval: + async_backend.milliseconds(1) + else: + statusInterval - sinceLast + let timer = sleepAsync(remaining) + try: + discard await race(read, timer) + finally: + cancelTimer(timer) + if read.finished: + await read # commit bytes to recvBuf (or re-raise a transport failure) + return nil + return read # timed wake: read still in flight, resume it on the next call else: await conn.fillRecvBuf() + return nil proc maybeSendPeriodicStatus( conn: PgConnection, @@ -918,7 +940,8 @@ proc handleReplicationData( copyData: seq[byte], autoKeepaliveReply: bool, callback: ReplicationCallback, -): Future[void] {.async.} = + lastStatusSent: Moment, +): Future[Moment] {.async.} = ## Process one CopyData frame from a replication stream: parse it, advance the ## received-WAL position on ``XLogData`` (the single source of truth read by ## ``confirmFlushed`` and the auto-reply), emit an automatic keepalive reply on @@ -926,6 +949,12 @@ proc handleReplicationData( ## set, then invoke the user ``callback``. Shared by ``startReplication`` and ## ``startPhysicalReplication`` so the received-tracking and auto-reply logic ## lives in exactly one place. + ## + ## Returns the timestamp to record as ``lastStatusSent``; it is updated when + ## an automatic keepalive reply is sent so that ``statusInterval`` tracks the + ## last time the server saw a Standby Status Update, preventing duplicate + ## proactive updates. + var newLastStatusSent = lastStatusSent let replMsg = parseReplicationMessage(copyData) case replMsg.kind of rmkXLogData: @@ -934,7 +963,9 @@ proc handleReplicationData( of rmkPrimaryKeepalive: if autoKeepaliveReply and replMsg.keepalive.replyRequested: await sendConfirmedStatus(conn, Lsn(conn.replMaxReceivedLsn())) + newLastStatusSent = Moment.now() await callback(replMsg) + return newLastStatusSent proc startReplication*( conn: PgConnection, @@ -1096,6 +1127,15 @@ proc startReplication*( # inherit a stale value. conn.resetReplLsnTracking(startLsn) var lastStatusSent = Moment.now() + var pendingRead: Future[void] = nil # in-flight timed read, threaded across waits + when hasChronos: + # If an error unwinds the loop while a timed read is still in flight, abandon + # it so it is not left dangling on the torn-down connection. Normal exits + # (CopyDone / ReadyForQuery) only happen after a read has been consumed, so + # `pendingRead` is nil then and this is a no-op — never a cancel-then-reread. + defer: + if pendingRead != nil and not pendingRead.finished: + pendingRead.cancelSoon() # Streaming loop block recvLoop: @@ -1104,7 +1144,9 @@ proc startReplication*( let msg = opt.get case msg.kind of bmkCopyData: - await conn.handleReplicationData(msg.copyData, autoKeepaliveReply, callback) + lastStatusSent = await conn.handleReplicationData( + msg.copyData, autoKeepaliveReply, callback, lastStatusSent + ) of bmkCopyDone: # Server ended the stream; reply with CopyDone before draining await conn.sendMsg(@copyDoneMsg) @@ -1126,7 +1168,8 @@ proc startReplication*( ) if conn.state == csClosed: raise newException(PgConnectionError, "Connection closed during replication") - await conn.replFillRecvBuf(statusInterval, lastStatusSent) + pendingRead = + await conn.replFillRecvBuf(statusInterval, lastStatusSent, pendingRead) # Send again after a timed wake with no new data (idle coverage on chronos). lastStatusSent = await conn.maybeSendPeriodicStatus( autoKeepaliveReply, statusInterval, lastStatusSent @@ -1269,6 +1312,13 @@ proc startPhysicalReplication*( # to the resume point so a reused connection does not inherit a stale value. conn.resetReplLsnTracking(startLsn) var lastStatusSent = Moment.now() + var pendingRead: Future[void] = nil # in-flight timed read, threaded across waits + when hasChronos: + # See startReplication: drop a still-in-flight timed read if an error unwinds + # the loop; nil (and so a no-op) on the normal CopyDone / ReadyForQuery exits. + defer: + if pendingRead != nil and not pendingRead.finished: + pendingRead.cancelSoon() block recvLoop: while true: @@ -1276,7 +1326,9 @@ proc startPhysicalReplication*( let msg = opt.get case msg.kind of bmkCopyData: - await conn.handleReplicationData(msg.copyData, autoKeepaliveReply, callback) + lastStatusSent = await conn.handleReplicationData( + msg.copyData, autoKeepaliveReply, callback, lastStatusSent + ) of bmkCopyDone: await conn.sendMsg(@copyDoneMsg) break recvLoop @@ -1299,7 +1351,8 @@ proc startPhysicalReplication*( raise newException( PgConnectionError, "Connection closed during physical replication" ) - await conn.replFillRecvBuf(statusInterval, lastStatusSent) + pendingRead = + await conn.replFillRecvBuf(statusInterval, lastStatusSent, pendingRead) lastStatusSent = await conn.maybeSendPeriodicStatus( autoKeepaliveReply, statusInterval, lastStatusSent ) diff --git a/tests/test_physical_replication.nim b/tests/test_physical_replication.nim index 9f2790f..fee6e86 100644 --- a/tests/test_physical_replication.nim +++ b/tests/test_physical_replication.nim @@ -52,6 +52,10 @@ var physObservedReceiveLsn: int64 = -1 var physObservedFlushLsn: int64 = -1 var physObservedApplyLsn: int64 = -1 var physObservedReplyMsgType: char = '\0' +var physStatusIntervalReceiveLsn: int64 = -1 +var physStatusIntervalFlushLsn: int64 = -1 +var physStatusIntervalApplyLsn: int64 = -1 +var physStatusIntervalReplyMsgType: char = '\0' suite "Physical replication: SQL building": proc runOneSqlTest( @@ -271,6 +275,83 @@ suite "Physical replication: auto keepalive reply": check physObservedApplyLsn == physReceivedEndLsn check capturedReceivedKinds == @[rmkXLogData, rmkPrimaryKeepalive] +suite "Physical replication: proactive status interval": + test "statusInterval sends a Standby Status without a reply-requested keepalive": + physStatusIntervalReplyMsgType = '\0' + physStatusIntervalReceiveLsn = -1 + physStatusIntervalFlushLsn = -1 + physStatusIntervalApplyLsn = -1 + + const + physStartLsn = 0x0000_0000_0000_1000'i64 + physWalData: seq[byte] = @[1'u8, 2, 3] + physReceivedEndLsn = physStartLsn + physWalData.len + physWalEnd = 0x0000_0000_0000_5000'i64 + physKeepaliveWalEnd = 0x0000_0000_0000_9999'i64 + + proc testBody() {.async.} = + let ms = startMockServer() + + proc serverHandler() {.async.} = + let st = await acceptAndReady(ms) + discard await drainFrontendMessage(st) # START_REPLICATION + var burst: seq[byte] + burst.add(buildCopyBothResponse()) + # XLogData only — no PrimaryKeepalive(replyRequested=true). + burst.add(buildXLogData(physStartLsn, physWalEnd, 0, physWalData)) + await sendBytes(st, burst) + # Let the status interval (50ms) elapse, then send a non-reply keepalive + # to unblock the asyncdispatch read (which cannot wake on a timer); + # chronos has already emitted updates on its own by now. + await sleepAsync(milliseconds(150)) + await sendBytes( + st, buildKeepalive(physKeepaliveWalEnd, 0, replyRequested = false) + ) + let reply = await drainFrontendMessage(st) + physStatusIntervalReplyMsgType = reply.msgType + if reply.msgType == 'd': + let ssu = decodeStandbyStatus(reply.body) + physStatusIntervalReceiveLsn = ssu.receive + physStatusIntervalFlushLsn = ssu.flush + physStatusIntervalApplyLsn = ssu.apply + # End the stream. Drain any further proactive updates plus the client's + # CopyDone so its CopyDone send sees an open socket. + var tail: seq[byte] + tail.add(buildCopyDone()) + tail.add(buildReadyForQuery('I')) + await sendBytes(st, tail) + while true: + let m = + try: + await drainFrontendMessage(st) + except CatchableError: + break + if m.msgType == 'c': # client's CopyDone + break + await closeClient(st) + + let serverFut = serverHandler() + let conn = await connect(mockConfig(ms.port)) + let cb = makeReplicationCallback: + {.cast(gcsafe).}: + if msg.kind == rmkXLogData: + discard conn.confirmFlushed(msg.xlogData.receivedEndLsn) + + await conn.startPhysicalReplication( + startLsn = Lsn(uint64(physStartLsn)), + statusInterval = milliseconds(50), + callback = cb, + ) + await conn.close() + await serverFut + await closeServer(ms) + + waitFor testBody() + check physStatusIntervalReplyMsgType == 'd' + check physStatusIntervalReceiveLsn == physReceivedEndLsn + check physStatusIntervalFlushLsn == physReceivedEndLsn + check physStatusIntervalApplyLsn == physReceivedEndLsn + suite "connectReplication mode": test "rmPhysical sends replication=true in StartupMessage": capturedStartupBody.setLen(0)