Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions async_postgres/pg_connection/buffer_io.nim
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,35 @@ proc fillRecvBuf*(
conn.recvBuf.setLen(oldLen + data.len)
copyMem(addr conn.recvBuf[oldLen], addr data[0], data.len)

when hasChronos:
proc fillRecvBufDetached*(conn: PgConnection): Future[void] {.async.} =
## Read one chunk into a private scratch buffer and append it to ``recvBuf``
## only once the read settles, leaving ``recvBuf`` parseable while the read is
## still in flight.
##
## ``fillRecvBuf`` grows ``recvBuf`` by ``RecvBufSize`` up front to hand the
## chronos ``readOnce`` a destination pointer, so a caller that parses
## ``recvBuf`` before the read completes would see uninitialised tail bytes.
## The replication status-interval path keeps a single read pending across
## timer wakes and parses between them, so it reads through here instead (see
## ``replFillRecvBuf``). On any read failure the connection is marked
## ``csClosed`` before re-raising, matching ``fillRecvBuf``.
if conn.replReadScratch.len < RecvBufSize:
conn.replReadScratch.setLen(RecvBufSize)
let n =
try:
await conn.reader.readOnce(addr conn.replReadScratch[0], RecvBufSize)
except CatchableError as e:
conn.state = csClosed
raise e
if n == 0:
conn.state = csClosed
raise newException(PgConnectionError, "Connection closed by server")
conn.compactRecvBuf()
let oldLen = conn.recvBuf.len
conn.recvBuf.setLen(oldLen + n)
copyMem(addr conn.recvBuf[oldLen], addr conn.replReadScratch[0], n)

proc nextMessage*(
conn: PgConnection, rowData: RowData = nil, rowCount: ptr int32 = nil
): Option[BackendMessage] {.raises: [PgProtocolError].} =
Expand Down
6 changes: 6 additions & 0 deletions async_postgres/pg_connection/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ type
## by `startReplication` / `startPhysicalReplication`. Used by
## `confirmFlushed` to reject confirmations beyond received WAL.
## Manipulated via the exported helpers in this module (see below).
replReadScratch*: seq[byte]
## Reusable scratch buffer for `fillRecvBufDetached` (chronos only). The
## proactive status-interval path keeps a single read in flight across
## timer wakes; reading into this private buffer (instead of growing
## `recvBuf` up front like `fillRecvBuf`) keeps `recvBuf` parseable while
## that read is still pending. Allocated lazily to `RecvBufSize` and reused.

QueryResult* = object
## Result of a query: field descriptions, row data, and command tag.
Expand Down
170 changes: 165 additions & 5 deletions async_postgres/pg_replication.nim
Original file line number Diff line number Diff line change
Expand Up @@ -850,19 +850,111 @@ proc resetReplLsnTracking(conn: PgConnection, startLsn: Lsn) =
## ``XLogData`` arrives and bounds what ``confirmFlushed`` will accept.
conn.initReplLsnTracking(startLsn.toUInt64)

proc replFillRecvBuf(
conn: PgConnection,
statusInterval: async_backend.Duration,
lastStatusSent: Moment,
pendingRead: Future[void],
): Future[Future[void]] {.async.} =
## Wait for more replication data, but wake early enough that the caller can
## emit a proactive Standby Status Update when ``statusInterval`` is set.
##
## ``pendingRead`` carries a single in-flight read across calls (``nil`` when
## none is outstanding). The updated read is returned: still pending after a
## timed wake, or ``nil`` once it has been consumed. The caller threads the
## returned value back in on the next call.
##
## With ``statusInterval == ZeroDuration`` (the default) this blocks until data
## arrives, exactly like a bare ``fillRecvBuf``.
##
## With a positive ``statusInterval`` under **chronos**, a single background read
## is raced against a timer sized to the time left until the next status update
## is due. On a timer wake the read is **left in flight** (never cancelled) and
## resumed on the next call: cancelling an in-flight transport read and then
## starting another races chronos' asynchronous cancellation (see
## ``RecvWatch.cancel``) and can surface as a "Read operation already pending"
## ``AsyncStreamReadError``. ``fillRecvBufDetached`` commits its bytes to ``recvBuf``
## only when awaited, so a read that completes during the timed wait is neither
## lost nor double-counted. Under **asyncdispatch** there is no timer-bounded
## wake — a timed read cannot be cancelled, and the abandoned read would consume
## and drop bytes, desyncing the stream — so it falls back to an unbounded read.
## The caller still emits status updates opportunistically after each received
## message, which covers a busy stream (where WAL actually accumulates); a fully
## idle asyncdispatch stream sends nothing until the next message arrives.
if statusInterval <= ZeroDuration:
await conn.fillRecvBuf()
return nil
when hasChronos:
var read = pendingRead
if read == nil:
read = conn.fillRecvBufDetached()
if not read.finished:
let sinceLast = Moment.now() - lastStatusSent
let remaining =
if sinceLast >= statusInterval:
async_backend.milliseconds(1)
else:
statusInterval - sinceLast
let timer = sleepAsync(remaining)
try:
discard await race(read, timer)
finally:
cancelTimer(timer)
if read.finished:
await read # commit bytes to recvBuf (or re-raise a transport failure)
return nil
return read # timed wake: read still in flight, resume it on the next call
else:
await conn.fillRecvBuf()
return nil

proc maybeSendPeriodicStatus(
conn: PgConnection,
autoKeepaliveReply: bool,
statusInterval: async_backend.Duration,
lastStatusSent: Moment,
): Future[Moment] {.async.} =
## Emit a proactive Standby Status Update if ``statusInterval`` has elapsed
## since the last one, so ``confirmed_flush_lsn`` advances (and
## ``wal_sender_timeout`` resets) even when the server never requests a reply —
## e.g. a server configured with ``wal_sender_timeout = 0``. The update reports
## the highest received LSN as receive and the ``confirmFlushed`` position as
## flush/apply, identical to the automatic keepalive reply, so it never advances
## flush past WAL the callback has confirmed durable. Returns the timestamp to
## record as the new ``lastStatusSent`` (unchanged when nothing was sent).
##
## Only active together with ``autoKeepaliveReply``: under manual reply
## management the caller owns the cadence and the reported LSNs via
## ``sendStandbyStatus``.
if not autoKeepaliveReply or statusInterval <= ZeroDuration:
return lastStatusSent
if conn.state != csReplicating:
return lastStatusSent
if Moment.now() - lastStatusSent < statusInterval:
return lastStatusSent
await sendConfirmedStatus(conn, Lsn(conn.replMaxReceivedLsn()))
return Moment.now()

proc handleReplicationData(
conn: PgConnection,
copyData: seq[byte],
autoKeepaliveReply: bool,
callback: ReplicationCallback,
): Future[void] {.async.} =
lastStatusSent: Moment,
): Future[Moment] {.async.} =
## Process one CopyData frame from a replication stream: parse it, advance the
## received-WAL position on ``XLogData`` (the single source of truth read by
## ``confirmFlushed`` and the auto-reply), emit an automatic keepalive reply on
## a ``PrimaryKeepalive`` with ``replyRequested`` when ``autoKeepaliveReply`` is
## set, then invoke the user ``callback``. Shared by ``startReplication`` and
## ``startPhysicalReplication`` so the received-tracking and auto-reply logic
## lives in exactly one place.
##
## Returns the timestamp to record as ``lastStatusSent``; it is updated when
## an automatic keepalive reply is sent so that ``statusInterval`` tracks the
## last time the server saw a Standby Status Update, preventing duplicate
## proactive updates.
var newLastStatusSent = lastStatusSent
let replMsg = parseReplicationMessage(copyData)
case replMsg.kind
of rmkXLogData:
Expand All @@ -871,14 +963,17 @@ proc handleReplicationData(
of rmkPrimaryKeepalive:
if autoKeepaliveReply and replMsg.keepalive.replyRequested:
await sendConfirmedStatus(conn, Lsn(conn.replMaxReceivedLsn()))
newLastStatusSent = Moment.now()
await callback(replMsg)
return newLastStatusSent

proc startReplication*(
conn: PgConnection,
slotName: string,
startLsn: Lsn = InvalidLsn,
options: seq[(string, string)] = @[],
autoKeepaliveReply: bool = true,
statusInterval: async_backend.Duration = ZeroDuration,
callback: ReplicationCallback,
): Future[void] {.async.} =
## Begin logical replication streaming from the given slot.
Expand Down Expand Up @@ -925,6 +1020,22 @@ proc startReplication*(
## block indefinitely waiting for a flush confirmation that never arrives.
## Call ``confirmFlushed`` promptly (or manage replies manually) in that setup.
##
## **Proactive status interval:** ``statusInterval`` (``ZeroDuration`` = off,
## the default) makes the library send a Standby Status Update on its own at
## least that often, in addition to answering reply-requested keepalives. The
## proactive update reports the highest received LSN as receive and the
## ``confirmFlushed`` position as flush/apply — same as the auto-reply — so it
## advances ``confirmed_flush_lsn`` (letting the server recycle WAL) without
## ever flushing past unconfirmed WAL. Set it when the server uses
## ``wal_sender_timeout = 0`` (or a long timeout): such a server never asks for
## a reply, so without a proactive interval the slot only advances on
## ``stopReplication`` and WAL accumulates meanwhile. It is honoured only when
## ``autoKeepaliveReply`` is true; under manual reply management drive the
## cadence yourself with ``sendStandbyStatus``. Under **asyncdispatch** the
## interval only fires while messages are flowing (it cannot safely interrupt a
## blocked read, so a fully idle stream sends nothing until the next message);
## **chronos** honours it even on a completely idle stream.
##
## If the auto-reply itself fails (for example, the connection is lost
## between receiving the keepalive and writing the Standby Status Update),
## the exception is propagated out of ``startReplication`` and the callback
Expand Down Expand Up @@ -1015,6 +1126,16 @@ proc startReplication*(
# confirmed-flush position to the resume point so a reused connection does not
# inherit a stale value.
conn.resetReplLsnTracking(startLsn)
var lastStatusSent = Moment.now()
var pendingRead: Future[void] = nil # in-flight timed read, threaded across waits
when hasChronos:
# If an error unwinds the loop while a timed read is still in flight, abandon
# it so it is not left dangling on the torn-down connection. Normal exits
# (CopyDone / ReadyForQuery) only happen after a read has been consumed, so
# `pendingRead` is nil then and this is a no-op — never a cancel-then-reread.
defer:
if pendingRead != nil and not pendingRead.finished:
pendingRead.cancelSoon()

# Streaming loop
block recvLoop:
Expand All @@ -1023,7 +1144,9 @@ proc startReplication*(
let msg = opt.get
case msg.kind
of bmkCopyData:
await conn.handleReplicationData(msg.copyData, autoKeepaliveReply, callback)
lastStatusSent = await conn.handleReplicationData(
msg.copyData, autoKeepaliveReply, callback, lastStatusSent
)
of bmkCopyDone:
# Server ended the stream; reply with CopyDone before draining
await conn.sendMsg(@copyDoneMsg)
Expand All @@ -1038,9 +1161,19 @@ proc startReplication*(
return
else:
discard
# Send a proactive status update if one is due after draining this batch
# (covers a busy stream on both backends).
lastStatusSent = await conn.maybeSendPeriodicStatus(
autoKeepaliveReply, statusInterval, lastStatusSent
)
if conn.state == csClosed:
raise newException(PgConnectionError, "Connection closed during replication")
await conn.fillRecvBuf()
pendingRead =
await conn.replFillRecvBuf(statusInterval, lastStatusSent, pendingRead)
# Send again after a timed wake with no new data (idle coverage on chronos).
lastStatusSent = await conn.maybeSendPeriodicStatus(
autoKeepaliveReply, statusInterval, lastStatusSent
)

# After CopyDone, drain to ReadyForQuery
block drainLoop:
Expand Down Expand Up @@ -1096,6 +1229,7 @@ proc startPhysicalReplication*(
slotName: string = "",
timeline: int32 = 0,
autoKeepaliveReply: bool = true,
statusInterval: async_backend.Duration = ZeroDuration,
callback: ReplicationCallback,
): Future[void] {.async.} =
## Begin **physical** replication streaming.
Expand All @@ -1121,6 +1255,13 @@ proc startPhysicalReplication*(
## manually), or the primary's synchronous ``COMMIT``s will block waiting for a
## flush position that never advances.
##
## ``statusInterval`` behaves as documented on ``startReplication``: a positive
## value makes the standby send a proactive Standby Status Update at least that
## often (receive = highest received, flush/apply = ``confirmFlushed``) so the
## primary can recycle WAL even when it never requests a reply (e.g.
## ``wal_sender_timeout = 0``); it is honoured only with ``autoKeepaliveReply``,
## and on asyncdispatch only fires while messages are flowing.
##
## On a timeline switch the server may send a final result set describing
## the next timeline (``RowDescription`` + ``DataRow`` + ``CommandComplete``)
## between ``CopyDone`` and ``ReadyForQuery``. This proc drains and discards
Expand Down Expand Up @@ -1170,14 +1311,24 @@ proc startPhysicalReplication*(
# position lives on the connection. Reset it and the confirmed-flush position
# to the resume point so a reused connection does not inherit a stale value.
conn.resetReplLsnTracking(startLsn)
var lastStatusSent = Moment.now()
var pendingRead: Future[void] = nil # in-flight timed read, threaded across waits
when hasChronos:
# See startReplication: drop a still-in-flight timed read if an error unwinds
# the loop; nil (and so a no-op) on the normal CopyDone / ReadyForQuery exits.
defer:
if pendingRead != nil and not pendingRead.finished:
pendingRead.cancelSoon()

block recvLoop:
while true:
while (let opt = conn.nextMessage(); opt.isSome):
let msg = opt.get
case msg.kind
of bmkCopyData:
await conn.handleReplicationData(msg.copyData, autoKeepaliveReply, callback)
lastStatusSent = await conn.handleReplicationData(
msg.copyData, autoKeepaliveReply, callback, lastStatusSent
)
of bmkCopyDone:
await conn.sendMsg(@copyDoneMsg)
break recvLoop
Expand All @@ -1191,11 +1342,20 @@ proc startPhysicalReplication*(
return
else:
discard
# See startReplication: send a proactive status update when one is due,
# both after draining a batch and after a timed idle wake.
lastStatusSent = await conn.maybeSendPeriodicStatus(
autoKeepaliveReply, statusInterval, lastStatusSent
)
if conn.state == csClosed:
raise newException(
PgConnectionError, "Connection closed during physical replication"
)
await conn.fillRecvBuf()
pendingRead =
await conn.replFillRecvBuf(statusInterval, lastStatusSent, pendingRead)
lastStatusSent = await conn.maybeSendPeriodicStatus(
autoKeepaliveReply, statusInterval, lastStatusSent
)

# After CopyDone, drain to ReadyForQuery. Accept the optional timeline-switch
# result set (RowDescription + DataRow + CommandComplete) the server emits
Expand Down
Loading