diff --git a/pgdog/src/backend/replication/logical/publisher/slot.rs b/pgdog/src/backend/replication/logical/publisher/slot.rs index dbc1997c1..0aa3232ca 100644 --- a/pgdog/src/backend/replication/logical/publisher/slot.rs +++ b/pgdog/src/backend/replication/logical/publisher/slot.rs @@ -1,5 +1,6 @@ use super::super::status::ReplicationSlot as ReplicationSlotTracker; use super::super::Error; +use crate::config::config; use crate::{ backend::{self, pool::Address, ConnectReason, Server, ServerOptions}, frontend::client::query_engine::two_pc::TwoPcTransactions, @@ -9,6 +10,8 @@ use crate::{ }, util::random_string, }; + +use pgdog_config::CopyFormat; use std::{fmt::Display, str::FromStr, time::Duration}; use tokio::time::timeout; use tracing::{debug, info, trace, warn}; @@ -293,10 +296,11 @@ impl ReplicationSlot { /// Start replication. pub async fn start_replication(&mut self) -> Result<(), Error> { + let is_binary = config().config.general.resharding_copy_format == CopyFormat::Binary; // TODO: This is definitely Postgres version-specific. let query = Query::new(format!( - r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '4', origin 'any', "publication_names" '"{}"')"#, - self.name, self.lsn, self.publication + r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '4', origin 'any', "publication_names" '"{}"', "binary" '{}')"#, + self.name, self.lsn, self.publication, is_binary )); self.server()?.send(&vec![query.into()].into()).await?; @@ -528,7 +532,9 @@ mod test { assert_eq!(relation.name, "test_slot_replication") } XLogPayload::Insert(insert) => { - assert_eq!(insert.column(0).unwrap().as_str().unwrap(), "1") + let col = insert.column(0).unwrap(); + let id = i64::from_be_bytes(col.data[..].try_into().unwrap()); + assert_eq!(id, 1); } XLogPayload::Begin(_) => (), XLogPayload::Commit(_) => got_row = true, diff --git a/pgdog/src/backend/replication/logical/subscriber/context.rs b/pgdog/src/backend/replication/logical/subscriber/context.rs index 3185a4ff4..ec2a081c8 100644 --- a/pgdog/src/backend/replication/logical/subscriber/context.rs +++ b/pgdog/src/backend/replication/logical/subscriber/context.rs @@ -137,6 +137,30 @@ mod test { assert!(matches!(shard.shard(), Shard::Direct(_))); } + #[test] + fn test_stream_context_binary_shard_key() { + // Binary-format shard key must flow through to_bind() and reach the router + // with Format::Binary preserved so sharding hashes the binary bigint correctly. + let cluster = Cluster::new_test(&config()); + let id: i64 = 1; + let tuple = TupleData { + columns: vec![Column { + identifier: Identifier::Format(Format::Binary), + len: 8, + data: Bytes::copy_from_slice(&id.to_be_bytes()), + }], + }; + let parse = Parse::new_anonymous("INSERT INTO sharded (id) VALUES ($1)"); + + let ctx = StreamContext::new(&cluster, &tuple, &parse).unwrap(); + assert_eq!(ctx.bind().parameter_format(0).unwrap(), Format::Binary); + assert_eq!( + ctx.bind().parameter(0).unwrap().unwrap().data(), + &id.to_be_bytes() + ); + assert!(matches!(ctx.shard(), Shard::Direct(_))); + } + // Verify that $N in the generated SQL matches the bind slot to_bind() places // the corresponding value into, for each DML operation and tuple shape. diff --git a/pgdog/src/net/messages/replication/logical/tuple_data.rs b/pgdog/src/net/messages/replication/logical/tuple_data.rs index 1cdfe2b15..fa1370bb7 100644 --- a/pgdog/src/net/messages/replication/logical/tuple_data.rs +++ b/pgdog/src/net/messages/replication/logical/tuple_data.rs @@ -1,6 +1,7 @@ use std::str::from_utf8; use bytes::BytesMut; +use tracing::warn; use crate::net::bind::Parameter; use crate::net::Bind; @@ -96,18 +97,22 @@ impl TupleData { /// Used by [`Table`](crate::backend::replication::logical::publisher::Table) DML methods /// — the `$N` they emit must agree with the column ordering of the tuple passed here. pub fn to_bind(&self, name: &str) -> Bind { - let params = self + let (params, codes): (Vec<_>, Vec<_>) = self .columns .iter() - .map(|c| { - if c.identifier == Identifier::Null { - Parameter::new_null() - } else { - Parameter::new(&c.data) + .map(|c| match &c.identifier { + Identifier::Null => (Parameter::new_null(), Format::Text), + Identifier::Toasted => { + warn!( + "to_bind: toasted column reached Bind construction; \ + caller should strip or fill toasted columns first — sending NULL" + ); + (Parameter::new_null(), Format::Text) } + Identifier::Format(fmt) => (Parameter::new(&c.data), *fmt), }) - .collect::>(); - Bind::new_params(name, ¶ms) + .unzip(); + Bind::new_params_codes(name, ¶ms, &codes) } /// Does this tuple contain any unchanged-TOAST (`'u'`) column? @@ -265,6 +270,15 @@ pub(crate) fn toasted_col() -> Column { } } +#[cfg(test)] +pub(crate) fn binary_col(data: &[u8]) -> Column { + Column { + identifier: Identifier::Format(Format::Binary), + len: data.len() as i32, + data: bytes::Bytes::copy_from_slice(data), + } +} + #[cfg(test)] mod test { use super::*; @@ -393,4 +407,65 @@ mod test { "column count mismatch must return Err, not panic" ); } + + #[test] + fn to_bind_all_text_columns_produce_text_format_codes() { + let tuple = TupleData { + columns: vec![text_col("hello"), text_col("world")], + }; + let bind = tuple.to_bind("__pgdog_1"); + assert_eq!(bind.parameter_format(0).unwrap(), Format::Text); + assert_eq!(bind.parameter_format(1).unwrap(), Format::Text); + } + + #[test] + fn to_bind_binary_columns_produce_binary_format_codes() { + // Simulate a bigint (8 bytes, big-endian) arriving as a binary column. + let val: i64 = 42; + let tuple = TupleData { + columns: vec![binary_col(&val.to_be_bytes())], + }; + let bind = tuple.to_bind("__pgdog_1"); + assert_eq!(bind.parameter_format(0).unwrap(), Format::Binary); + // Data must be forwarded verbatim — the destination decodes it as binary. + assert_eq!( + bind.parameter(0).unwrap().unwrap().data(), + &val.to_be_bytes() + ); + } + + #[test] + fn to_bind_null_and_toasted_use_text_format_code() { + let tuple = TupleData { + columns: vec![ + Column { + identifier: Identifier::Null, + len: -1, + data: Bytes::new(), + }, + toasted_col(), + ], + }; + let bind = tuple.to_bind("__pgdog_1"); + // Format code is irrelevant for absent values, but must not be Binary + // to avoid confusing the destination server. + assert_eq!(bind.parameter_format(0).unwrap(), Format::Text); + assert_eq!(bind.parameter_format(1).unwrap(), Format::Text); + } + + #[test] + fn to_bind_mixed_columns_produce_per_column_format_codes() { + let val: i32 = 99; + let tuple = TupleData { + columns: vec![ + text_col("label"), + binary_col(&val.to_be_bytes()), + text_col("suffix"), + ], + }; + let bind = tuple.to_bind("__pgdog_1"); + assert_eq!(bind.parameter_format(0).unwrap(), Format::Text); + assert_eq!(bind.parameter_format(1).unwrap(), Format::Binary); + assert_eq!(bind.parameter_format(2).unwrap(), Format::Text); + } }