Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions async_postgres/pg_replication.nim
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,18 @@ type
UpdateMessage* = object ## Row update.
relationId*: int32
hasOldTuple*: bool ## True if old key/full row is included
keyKind*: char
## When hasOldTuple is true: 'K' if oldTuple holds only the replica
## identity key, 'O' if it holds the full old row (REPLICA IDENTITY FULL).
## '\0' when no old tuple is present.
oldTuple*: seq[TupleField]
newTuple*: seq[TupleField]

DeleteMessage* = object ## Row deletion.
relationId*: int32
keyKind*: char
## 'K' if oldTuple holds only the replica identity key,
## 'O' if it holds the full old row (REPLICA IDENTITY FULL).
oldTuple*: seq[TupleField]

TruncateMessage* = object ## Table truncation.
Expand Down Expand Up @@ -401,7 +408,9 @@ proc parsePgOutputMessage*(data: openArray[byte]): PgOutputMessage =
of 'I': # Insert
var msg = InsertMessage()
msg.relationId = readInt32At(data, 1)
# byte at offset 5 is 'N' (new tuple marker)
let marker = char(readByteAt(data, 5)) # 'N' (new tuple marker)
if marker != 'N':
raise newException(PgProtocolError, "Unknown Insert tuple marker: " & marker)
let (fields, _) = decodeTuple(data, 6)
msg.newTuple = fields
PgOutputMessage(kind: pomkInsert, insert: msg)
Expand All @@ -414,10 +423,15 @@ proc parsePgOutputMessage*(data: openArray[byte]): PgOutputMessage =
if marker == 'K' or marker == 'O':
# Old key or old tuple included
msg.hasOldTuple = true
msg.keyKind = marker
let (oldFields, nextPos) = decodeTuple(data, pos)
msg.oldTuple = oldFields
pos = nextPos
inc pos # skip 'N' marker for new tuple
let newMarker = char(readByteAt(data, pos)) # 'N' (new tuple marker)
if newMarker != 'N':
raise
newException(PgProtocolError, "Unknown Update new tuple marker: " & newMarker)
inc pos
elif marker != 'N':
raise newException(PgProtocolError, "Unknown Update tuple marker: " & marker)
let (newFields, _) = decodeTuple(data, pos)
Expand All @@ -427,7 +441,10 @@ proc parsePgOutputMessage*(data: openArray[byte]): PgOutputMessage =
var msg = DeleteMessage()
msg.relationId = readInt32At(data, 1)
var pos = 5
# byte at offset 5 is 'K' (key) or 'O' (old tuple)
let marker = char(readByteAt(data, pos)) # 'K' (key) or 'O' (old tuple)
if marker != 'K' and marker != 'O':
raise newException(PgProtocolError, "Unknown Delete tuple marker: " & marker)
msg.keyKind = marker
inc pos
let (fields, _) = decodeTuple(data, pos)
msg.oldTuple = fields
Expand Down
94 changes: 94 additions & 0 deletions tests/test_replication.nim
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,20 @@ suite "pgoutput decoder":
check msg.insert.newTuple[0].data == @[byte('4'), byte('2')]
check msg.insert.newTuple[1].kind == tdkNull

test "Insert message with invalid marker raises":
var data: seq[byte]
data.add(byte('I'))
data.addInt32(16384'i32) # relationId
data.add(byte('X')) # invalid marker (should be 'N')
data.addInt16(1'i16)
data.add(byte('t'))
data.addInt32(2'i32)
data.add(byte('4'))
data.add(byte('2'))

expect(PgProtocolError):
discard parsePgOutputMessage(data)

test "Delete message":
var data: seq[byte]
data.add(byte('D'))
Expand All @@ -264,9 +278,40 @@ suite "pgoutput decoder":
let msg = parsePgOutputMessage(data)
check msg.kind == pomkDelete
check msg.delete.relationId == 16384'i32
check msg.delete.keyKind == 'K'
check msg.delete.oldTuple.len == 1
check msg.delete.oldTuple[0].kind == tdkText

test "Delete message with full old tuple":
var data: seq[byte]
data.add(byte('D'))
data.addInt32(16384'i32) # relationId
data.add(byte('O')) # old tuple marker (REPLICA IDENTITY FULL)
data.addInt16(1'i16) # 1 column
data.add(byte('t'))
data.addInt32(2'i32)
data.add(byte('4'))
data.add(byte('2'))

let msg = parsePgOutputMessage(data)
check msg.kind == pomkDelete
check msg.delete.keyKind == 'O'
check msg.delete.oldTuple.len == 1

test "Delete message with invalid marker raises":
var data: seq[byte]
data.add(byte('D'))
data.addInt32(16384'i32) # relationId
data.add(byte('X')) # invalid marker
data.addInt16(1'i16)
data.add(byte('t'))
data.addInt32(2'i32)
data.add(byte('4'))
data.add(byte('2'))

expect(PgProtocolError):
discard parsePgOutputMessage(data)

test "Update message with old tuple":
var data: seq[byte]
data.add(byte('U'))
Expand All @@ -289,11 +334,37 @@ suite "pgoutput decoder":
let msg = parsePgOutputMessage(data)
check msg.kind == pomkUpdate
check msg.update.hasOldTuple == true
check msg.update.keyKind == 'O'
check msg.update.oldTuple.len == 1
check msg.update.oldTuple[0].data == @[byte('o'), byte('l'), byte('d')]
check msg.update.newTuple.len == 1
check msg.update.newTuple[0].data == @[byte('n'), byte('e'), byte('w')]

test "Update message with old key only":
var data: seq[byte]
data.add(byte('U'))
data.addInt32(16384'i32) # relationId
data.add(byte('K')) # old key marker
data.addInt16(1'i16) # 1 column in old key
data.add(byte('t'))
data.addInt32(2'i32)
data.add(byte('4'))
data.add(byte('2'))
data.add(byte('N')) # new tuple marker
data.addInt16(1'i16) # 1 column in new tuple
data.add(byte('t'))
data.addInt32(3'i32)
data.add(byte('n'))
data.add(byte('e'))
data.add(byte('w'))

let msg = parsePgOutputMessage(data)
check msg.kind == pomkUpdate
check msg.update.hasOldTuple == true
check msg.update.keyKind == 'K'
check msg.update.oldTuple.len == 1
check msg.update.newTuple.len == 1

test "Update message without old tuple":
var data: seq[byte]
data.add(byte('U'))
Expand All @@ -308,6 +379,7 @@ suite "pgoutput decoder":
let msg = parsePgOutputMessage(data)
check msg.kind == pomkUpdate
check msg.update.hasOldTuple == false
check msg.update.keyKind == '\0'
check msg.update.oldTuple.len == 0
check msg.update.newTuple.len == 1

Expand All @@ -325,6 +397,28 @@ suite "pgoutput decoder":
expect(PgProtocolError):
discard parsePgOutputMessage(data)

test "Update message with invalid new tuple marker raises":
var data: seq[byte]
data.add(byte('U'))
data.addInt32(16384'i32) # relationId
data.add(byte('O')) # old tuple marker
data.addInt16(1'i16) # 1 column in old tuple
data.add(byte('t'))
data.addInt32(3'i32)
data.add(byte('o'))
data.add(byte('l'))
data.add(byte('d'))
data.add(byte('X')) # invalid new tuple marker (should be 'N')
data.addInt16(1'i16)
data.add(byte('t'))
data.addInt32(3'i32)
data.add(byte('n'))
data.add(byte('e'))
data.add(byte('w'))

expect(PgProtocolError):
discard parsePgOutputMessage(data)

test "Truncate message":
var data: seq[byte]
data.add(byte('T'))
Expand Down