Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion async_postgres/pg_connection/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,20 @@ type
## borrower. Raw `acquire` / `release(conn)` callers carry that risk;
## `PooledConnHandle` (its own `released` flag) and the `with*` templates
## are the fully safe paths.
replConfirmedFlushLsnRaw: uint64
## Internal replication state: raw LSN value up to which the application
## has confirmed received WAL is durably flushed during a replication
## stream. Public API users go through `confirmFlushed` /
## `confirmedFlushLsn` in `pg_replication`, which add the `Lsn` typing,
## the monotonic guard, and the received-WAL bound check. Manipulated
## via the exported helpers in this module (see below).
replMaxReceivedLsnRaw: uint64
## Internal replication state: highest WAL position actually received
## from the wire during the current stream (`XLogData` end LSN, not the
## server's `walEnd`). Initialised to the stream's `startLsn` and updated
## by `startReplication` / `startPhysicalReplication`. Used by
## `confirmFlushed` to reject confirmations beyond received WAL.
## Manipulated via the exported helpers in this module (see below).

QueryResult* = object
## Result of a query: field descriptions, row data, and command tag.
Expand Down Expand Up @@ -651,7 +665,53 @@ func effectiveMaxMessageSize*(conn: PgConnection): int {.inline.} =
else:
DefaultMaxBackendMessageLen

# Error builder
# Internal replication LSN plumbing (cross-module use within the library)
#
# `pg_replication` owns the typed `Lsn` API (`confirmFlushed`,
# `confirmedFlushLsn`); `types` stores raw `uint64` values to avoid a dependency
# cycle. The procedures below are exported so `pg_replication` can use them
# without `{.all.}`, but they are intentionally low-level: the public way to
# advance the confirmed-flush position remains `confirmFlushed` in
# `pg_replication`, which adds the `Lsn` typing and the received-WAL bound check.

proc initReplLsnTracking*(conn: PgConnection, startLsn: uint64) =
## Reset the per-stream confirmed-flush and max-received positions to
## `startLsn`. Called at the beginning of each replication stream so a reused
## connection never inherits stale values from a previous stream.
conn.replConfirmedFlushLsnRaw = startLsn
conn.replMaxReceivedLsnRaw = startLsn

proc updateReplMaxReceivedLsn*(conn: PgConnection, received: uint64): bool =
## Advance the highest-received WAL position if `received` is greater than
## the current value. Returns `true` if the value was updated.
if received > conn.replMaxReceivedLsnRaw:
conn.replMaxReceivedLsnRaw = received
true
else:
false

func replConfirmedFlushLsn*(conn: PgConnection): uint64 =
## Raw confirmed-flush LSN. Public API users should use `confirmedFlushLsn`
## in `pg_replication`.
conn.replConfirmedFlushLsnRaw

func replMaxReceivedLsn*(conn: PgConnection): uint64 =
## Raw max-received LSN. Public API users should use the `confirmFlushed`
## bounds via the typed API in `pg_replication`.
conn.replMaxReceivedLsnRaw

proc confirmReplFlushed*(conn: PgConnection, lsn: uint64): bool =
## Clamp `lsn` to the max-received WAL position and advance the
## confirmed-flush LSN only monotonically. Returns `true` if the position
## moved forward. This is the raw, untyped helper used by `confirmFlushed`
## in `pg_replication`; public callers should use that typed API.
let bounded =
if lsn > conn.replMaxReceivedLsnRaw: conn.replMaxReceivedLsnRaw else: lsn
if bounded > conn.replConfirmedFlushLsnRaw:
conn.replConfirmedFlushLsnRaw = bounded
true
else:
false

proc newPgQueryError*(fields: seq[ErrorField]): ref PgQueryError =
## Create a PgQueryError from server ErrorResponse fields.
Expand Down
Loading