From d9f1df135348458dbf412e3a29b74e208f9af144 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Sun, 31 May 2026 21:00:11 +0000 Subject: [PATCH 1/2] test(prepared_statements): fix flaky ruby tests Use pinned transaction instead of flaky multithreaded query execution to test the behaviour of prepared_transaction option and make sure the test request is landing on the another backend connection that hadn't prepared the transaction. --- .../ruby/prepared_disabled/prepared_spec.rb | 49 ++++++++----------- .../ruby/prepared_full/prepared_spec.rb | 44 +++++++---------- integration/ruby/prepared_spec.rb | 45 ++++++++--------- 3 files changed, 59 insertions(+), 79 deletions(-) diff --git a/integration/ruby/prepared_disabled/prepared_spec.rb b/integration/ruby/prepared_disabled/prepared_spec.rb index 2b73f8083..b37047c1e 100644 --- a/integration/ruby/prepared_disabled/prepared_spec.rb +++ b/integration/ruby/prepared_disabled/prepared_spec.rb @@ -44,42 +44,33 @@ conn2.close end - # In transaction pool mode each query can land on a different backend. - # disabled mode forwards Parse and Bind as-is with no global cache, so a - # Bind that arrives on a backend that never saw the Parse fails. - # - # Sequential tests cannot force a crossing: a single connection always - # returns the backend to the LIFO top between queries, so the next query - # gets the same backend. Concurrent threads make the pool hand out both - # backends simultaneously. With 5 threads and pool_size = 2, the pigeonhole - # principle guarantees that at least 3 threads will attempt PREPARE on a - # backend that already holds the statement, producing 'already exists' - # errors — regardless of timing. + # Prepare on backend A. pgdog intercepts bare BEGIN without checking out a + # backend (deferred until the first real query), so a SELECT 1 is needed to + # force the actual checkout and keep backend A pinned in the open transaction. + # exec_prepared then lands on backend B which never saw the Parse. + # In disabled mode pgdog forwards Bind as-is — 'prepared statement does not exist'. # # Mirror: full/'executes named extended-protocol statements in - # transaction pool mode' — same structure, opposite expectation. + # transaction pool mode' — identical structure, opposite expectation. it 'fails named extended-protocol statements in transaction pool mode' do - errors = [] - mutex = Mutex.new - - threads = 5.times.map do - Thread.new do - conn = connect - begin - conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') - 20.times { conn.exec_prepared('ext_stmt', [42]) } - rescue PG::Error => e - mutex.synchronize { errors << e.message } - ensure - conn.close rescue nil - end + conn = connect + begin + conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') + pin = connect + begin + pin.exec('BEGIN') + pin.exec('SELECT 1') # force actual pool checkout; pin now holds backend A + expect { conn.exec_prepared('ext_stmt', [42]) }.to raise_error(PG::Error) + ensure + pin.exec('ROLLBACK') rescue nil + pin.close rescue nil end + ensure + conn.close rescue nil end - - threads.each(&:join) - expect(errors).not_to be_empty end + # Same mechanism as the extended-protocol test above, but for the # simple-protocol PREPARE/EXECUTE path. disabled mode forwards the SQL # statement text as-is, so EXECUTE on a backend that never saw the diff --git a/integration/ruby/prepared_full/prepared_spec.rb b/integration/ruby/prepared_full/prepared_spec.rb index 69defacb5..6044d26ee 100644 --- a/integration/ruby/prepared_full/prepared_spec.rb +++ b/integration/ruby/prepared_full/prepared_spec.rb @@ -71,35 +71,29 @@ end # Mirror of disabled/'fails named extended-protocol statements in - # transaction pool mode'. full mode renames each frontend's Parse to an - # internal name (__pgdog_N, unique per frontend) and replays it on any - # backend before the Bind. 5 threads × 20 iterations with pool_size = 2 - # forces genuine crossings \ the replay ensures all succeed. - # Result values are also verified to guard against silent data corruption. + # transaction pool mode' — identical structure, opposite expectation. + # pgdog intercepts bare BEGIN without a pool checkout, so SELECT 1 forces + # the actual checkout keeping backend A pinned. exec_prepared lands on + # backend B; full mode replays the Parse from the global cache — succeeds. it 'executes named extended-protocol statements in transaction pool mode' do - errors = [] - mutex = Mutex.new - - threads = 5.times.map do - Thread.new do - conn = connect - begin - conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') - 20.times do |i| - res = conn.exec_prepared('ext_stmt', [i]) - val = res[0]['val'].to_i - raise "expected #{i}, got #{val}" unless val == i - end - rescue => e - mutex.synchronize { errors << e.message } - ensure - conn.close rescue nil + conn = connect + begin + conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') + pin = connect + begin + pin.exec('BEGIN') + pin.exec('SELECT 1') # force actual pool checkout; pin now holds backend A + 10.times do |i| + res = conn.exec_prepared('ext_stmt', [i]) + expect(res[0]['val'].to_i).to eq(i) end + ensure + pin.exec('ROLLBACK') rescue nil + pin.close rescue nil end + ensure + conn.close rescue nil end - - threads.each(&:join) - expect(errors).to be_empty end end diff --git a/integration/ruby/prepared_spec.rb b/integration/ruby/prepared_spec.rb index be23e1aa2..33977d539 100644 --- a/integration/ruby/prepared_spec.rb +++ b/integration/ruby/prepared_spec.rb @@ -47,35 +47,30 @@ conn2.close end - # extended mode renames each frontend's Parse to an internal name - # (__pgdog_N, unique per frontend) and replays it on any backend before - # the Bind. 15 threads × 20 iterations with a default pool of 10 guarantees - # genuine crossings via the pigeonhole principle; the replay ensures all succeed. - # Result values are verified to guard against silent data corruption. + # Mirror of disabled/'fails named extended-protocol statements in + # transaction pool mode' — identical structure, opposite expectation. + # pgdog intercepts bare BEGIN without a pool checkout, so SELECT 1 forces + # the actual checkout keeping backend A pinned. exec_prepared lands on + # backend B; extended mode replays the Parse from the global cache — succeeds. it 'executes named extended-protocol statements in transaction pool mode' do - errors = [] - mutex = Mutex.new - - threads = 15.times.map do - Thread.new do - conn = connect - begin - conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') - 20.times do |i| - res = conn.exec_prepared('ext_stmt', [i]) - val = res[0]['val'].to_i - raise "expected #{i}, got #{val}" unless val == i - end - rescue => e - mutex.synchronize { errors << e.message } - ensure - conn.close rescue nil + conn = connect + begin + conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') + pin = connect + begin + pin.exec('BEGIN') + pin.exec('SELECT 1') # force actual pool checkout; pin now holds backend A + 10.times do |i| + res = conn.exec_prepared('ext_stmt', [i]) + expect(res[0]['val'].to_i).to eq(i) end + ensure + pin.exec('ROLLBACK') rescue nil + pin.close rescue nil end + ensure + conn.close rescue nil end - - threads.each(&:join) - expect(errors).to be_empty end # extended mode does NOT intercept SQL PREPARE / EXECUTE — those are From a23a7144a1ea43cfa7a21cae534c172167339825 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Mon, 1 Jun 2026 18:05:03 +0000 Subject: [PATCH 2/2] test(prepared_statements): verify multiple queries Instead of relying on a single query after preparing statement for the verification use combined approach that more reliably verifies the expected behaviour. - hold one connection with prepared_statement to verify how other pool connections are affected - run multiple parallel connection to randomize the actually used pool connection and do not rely on LIFO ordering for fetching backend connections from pool --- integration/ruby/Gemfile | 1 + integration/ruby/Gemfile.lock | 1 + integration/ruby/prepared_disabled/pgdog.toml | 2 +- .../ruby/prepared_disabled/prepared_spec.rb | 72 ++++++++----------- integration/ruby/prepared_full/pgdog.toml | 2 +- .../ruby/prepared_full/prepared_spec.rb | 64 +++++++---------- integration/ruby/prepared_spec.rb | 63 +++++++--------- integration/ruby/rspec_helper.rb | 50 +++++++++++++ 8 files changed, 135 insertions(+), 120 deletions(-) diff --git a/integration/ruby/Gemfile b/integration/ruby/Gemfile index 3b81f586f..af7a9c28a 100644 --- a/integration/ruby/Gemfile +++ b/integration/ruby/Gemfile @@ -1,6 +1,7 @@ # frozen_string_literal: true source 'https://rubygems.org' +gem 'concurrent-ruby' gem 'datadog', '~> 2.0' gem 'pg' gem 'rails' diff --git a/integration/ruby/Gemfile.lock b/integration/ruby/Gemfile.lock index ac3bbf6b2..0ab7eb41e 100644 --- a/integration/ruby/Gemfile.lock +++ b/integration/ruby/Gemfile.lock @@ -286,6 +286,7 @@ PLATFORMS x86_64-linux-musl DEPENDENCIES + concurrent-ruby datadog (~> 2.0) pg rails diff --git a/integration/ruby/prepared_disabled/pgdog.toml b/integration/ruby/prepared_disabled/pgdog.toml index e133d29ec..b96c75484 100644 --- a/integration/ruby/prepared_disabled/pgdog.toml +++ b/integration/ruby/prepared_disabled/pgdog.toml @@ -1,6 +1,6 @@ [general] prepared_statements = "disabled" -default_pool_size = 2 +default_pool_size = 5 [[databases]] name = "pgdog" diff --git a/integration/ruby/prepared_disabled/prepared_spec.rb b/integration/ruby/prepared_disabled/prepared_spec.rb index b37047c1e..d4cc7123e 100644 --- a/integration/ruby/prepared_disabled/prepared_spec.rb +++ b/integration/ruby/prepared_disabled/prepared_spec.rb @@ -44,60 +44,44 @@ conn2.close end - # Prepare on backend A. pgdog intercepts bare BEGIN without checking out a - # backend (deferred until the first real query), so a SELECT 1 is needed to - # force the actual checkout and keep backend A pinned in the open transaction. - # exec_prepared then lands on backend B which never saw the Parse. - # In disabled mode pgdog forwards Bind as-is — 'prepared statement does not exist'. - # - # Mirror: full/'executes named extended-protocol statements in - # transaction pool mode' — identical structure, opposite expectation. + # Test that the prepared statement that is created inside the connection + # could not be reliably executed with prepared_statements=false since it + # could land on another backend connection that doesn't have this + # statement prepared. it 'fails named extended-protocol statements in transaction pool mode' do conn = connect - begin - conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') - pin = connect - begin - pin.exec('BEGIN') - pin.exec('SELECT 1') # force actual pool checkout; pin now holds backend A - expect { conn.exec_prepared('ext_stmt', [42]) }.to raise_error(PG::Error) - ensure - pin.exec('ROLLBACK') rescue nil - pin.close rescue nil + conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') + + with_pinned_backend do # hold the backend conn prepared on + with_load do + 20.times do + expect { conn.exec_prepared('ext_stmt', [1]) }.to raise_error(PG::Error) + end end - ensure - conn.close rescue nil end + ensure + conn.close rescue nil end - - # Same mechanism as the extended-protocol test above, but for the - # simple-protocol PREPARE/EXECUTE path. disabled mode forwards the SQL - # statement text as-is, so EXECUTE on a backend that never saw the - # PREPARE fails with 'prepared statement does not exist' or, if two - # threads hit the same backend, 'already exists'. - # - # Mirror: full/'rewrites simple-protocol PREPARE / EXECUTE in - # transaction pool mode' — same structure, opposite expectation. + # Test that a statement created with SQL PREPARE inside the connection + # could not be reliably executed with prepared_statements=false since the + # EXECUTE could land on another backend connection that doesn't have this + # statement prepared. it 'fails SQL PREPARE/EXECUTE in transaction pool mode' do - errors = [] - mutex = Mutex.new + conn = connect + # PREPARE and EXECUTE both route to the primary pool (pgdog treats them as + # writes), the same pool the pin holds a backend in. disabled mode never + # replays them, so every EXECUTE forced onto another backend fails. + conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2 AS val') - threads = 5.times.map do - Thread.new do - conn = connect - begin - conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2') - 20.times { |i| conn.exec("EXECUTE sql_stmt(#{i})") } - rescue PG::Error => e - mutex.synchronize { errors << e.message } - ensure - conn.close rescue nil + with_pinned_backend do # hold the backend conn prepared on + with_load do + 20.times do + expect { conn.exec('EXECUTE sql_stmt(1)') }.to raise_error(PG::Error) end end end - - threads.each(&:join) - expect(errors).not_to be_empty + ensure + conn.close rescue nil end end diff --git a/integration/ruby/prepared_full/pgdog.toml b/integration/ruby/prepared_full/pgdog.toml index 77ec881eb..435914465 100644 --- a/integration/ruby/prepared_full/pgdog.toml +++ b/integration/ruby/prepared_full/pgdog.toml @@ -1,6 +1,6 @@ [general] prepared_statements = "full" -default_pool_size = 2 +default_pool_size = 5 [[databases]] name = "pgdog" diff --git a/integration/ruby/prepared_full/prepared_spec.rb b/integration/ruby/prepared_full/prepared_spec.rb index 6044d26ee..0a23f7a82 100644 --- a/integration/ruby/prepared_full/prepared_spec.rb +++ b/integration/ruby/prepared_full/prepared_spec.rb @@ -29,31 +29,28 @@ conn.close end - # Mirror of disabled/'fails SQL PREPARE/EXECUTE in transaction pool mode'. - # full mode intercepts PREPARE, renames the statement to an internal name - # (__pgdog_N), and replays the PREPARE on any backend that hasn't seen it - # before executing. 5 threads × 20 iterations with pool_size = 2 generates - # the same backend crossings as the disabled test, but all succeed. + # Test that a statement created with SQL PREPARE inside the connection + # can be reliably executed with prepared_statements=full even when the + # EXECUTE lands on another backend connection, because pgdog replays the + # PREPARE onto it. it 'rewrites simple-protocol PREPARE / EXECUTE in transaction pool mode' do - errors = [] - mutex = Mutex.new + conn = connect + # PREPARE and EXECUTE both route to the primary pool (pgdog treats them as + # writes), the same pool the pin holds a backend in, so each EXECUTE is forced + # onto a backend that never saw the PREPARE -- full mode replays it there, so + # they still succeed. + conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2 AS val') - threads = 5.times.map do - Thread.new do - conn = connect - begin - conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2') - 20.times { |i| conn.exec("EXECUTE sql_stmt(#{i})") } - rescue PG::Error => e - mutex.synchronize { errors << e.message } - ensure - conn.close rescue nil + with_pinned_backend do # hold the backend conn prepared on + with_load do + 20.times do |i| + res = conn.exec("EXECUTE sql_stmt(#{i})") + expect(res[0]['val'].to_i).to eq(i * 2) end end end - - threads.each(&:join) - expect(errors).to be_empty + ensure + conn.close rescue nil end # Session mode gives each client its own dedicated backend, so two session @@ -70,30 +67,23 @@ conn2.close end - # Mirror of disabled/'fails named extended-protocol statements in - # transaction pool mode' — identical structure, opposite expectation. - # pgdog intercepts bare BEGIN without a pool checkout, so SELECT 1 forces - # the actual checkout keeping backend A pinned. exec_prepared lands on - # backend B; full mode replays the Parse from the global cache — succeeds. + # Test that the prepared statement that is created inside the connection + # can be reliably executed with prepared_statements=full even when it lands + # on another backend connection, because pgdog replays the prepare onto it. it 'executes named extended-protocol statements in transaction pool mode' do conn = connect - begin - conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') - pin = connect - begin - pin.exec('BEGIN') - pin.exec('SELECT 1') # force actual pool checkout; pin now holds backend A - 10.times do |i| + conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') + + with_pinned_backend do # hold the backend conn prepared on + with_load do + 20.times do |i| res = conn.exec_prepared('ext_stmt', [i]) expect(res[0]['val'].to_i).to eq(i) end - ensure - pin.exec('ROLLBACK') rescue nil - pin.close rescue nil end - ensure - conn.close rescue nil end + ensure + conn.close rescue nil end end diff --git a/integration/ruby/prepared_spec.rb b/integration/ruby/prepared_spec.rb index 33977d539..2af7562bb 100644 --- a/integration/ruby/prepared_spec.rb +++ b/integration/ruby/prepared_spec.rb @@ -47,56 +47,45 @@ conn2.close end - # Mirror of disabled/'fails named extended-protocol statements in - # transaction pool mode' — identical structure, opposite expectation. - # pgdog intercepts bare BEGIN without a pool checkout, so SELECT 1 forces - # the actual checkout keeping backend A pinned. exec_prepared lands on - # backend B; extended mode replays the Parse from the global cache — succeeds. + # Test that the prepared statement that is created inside the connection + # can be reliably executed with prepared_statements=extended even when it + # lands on another backend connection, because pgdog replays the prepare + # onto it. it 'executes named extended-protocol statements in transaction pool mode' do conn = connect - begin - conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') - pin = connect - begin - pin.exec('BEGIN') - pin.exec('SELECT 1') # force actual pool checkout; pin now holds backend A - 10.times do |i| + conn.prepare('ext_stmt', 'SELECT $1::bigint AS val') + + with_pinned_backend do + with_load do + 20.times do |i| res = conn.exec_prepared('ext_stmt', [i]) expect(res[0]['val'].to_i).to eq(i) end - ensure - pin.exec('ROLLBACK') rescue nil - pin.close rescue nil end - ensure - conn.close rescue nil end + ensure + conn.close rescue nil end - # extended mode does NOT intercept SQL PREPARE / EXECUTE — those are - # forwarded as-is. With 15 threads and a default pool of 10, the pigeonhole - # principle guarantees crossings: at least 5 threads hit a backend that - # already holds 'sql_stmt' ('already exists') or one that never saw the - # PREPARE ('does not exist'). Either way, errors accumulate. + # Test that a statement created with SQL PREPARE inside the connection + # could not be reliably executed with prepared_statements=extended, which + # does not intercept SQL PREPARE/EXECUTE, so the EXECUTE could land on + # another backend connection that doesn't have this statement prepared. it 'fails SQL PREPARE/EXECUTE in transaction pool mode' do - errors = [] - mutex = Mutex.new + conn = connect + # PREPARE and EXECUTE both route to the primary pool (pgdog treats them as + # writes), the same pool the pin holds a backend in. extended mode does not + # intercept SQL PREPARE/EXECUTE, so every EXECUTE forced onto another backend fails. + conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2 AS val') - threads = 15.times.map do - Thread.new do - conn = connect - begin - conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2') - 20.times { |i| conn.exec("EXECUTE sql_stmt(#{i})") } - rescue PG::Error => e - mutex.synchronize { errors << e.message } - ensure - conn.close rescue nil + with_pinned_backend do # hold the backend conn prepared on + with_load do + 20.times do + expect { conn.exec('EXECUTE sql_stmt(1)') }.to raise_error(PG::Error) end end end - - threads.each(&:join) - expect(errors).not_to be_empty + ensure + conn.close rescue nil end end diff --git a/integration/ruby/rspec_helper.rb b/integration/ruby/rspec_helper.rb index f0c0015d0..d213a73d8 100644 --- a/integration/ruby/rspec_helper.rb +++ b/integration/ruby/rspec_helper.rb @@ -6,6 +6,7 @@ require 'toxiproxy' require 'datadog' require 'securerandom' +require 'concurrent' # Configure Datadog APM so ActiveRecord auto-injects sqlcommenter-style # trailing comments (traceparent, tracestate, dddbs, dde, ddps, ddpv, ddh, @@ -54,6 +55,55 @@ def connect(db = 'pgdog', user = 'pgdog') PG.connect(dbname: db, user: user, password: 'pgdog', port: 6432, host: '127.0.0.1') end +# "Pin" a backend server by holding it inside an open transaction so the pool +# cannot hand it to any other connection. Relies on pgdog's LIFO idle-connection +# reuse: call this immediately after the query whose backend you want to hold +# (e.g. a PREPARE), before any other query, so the checkout reclaims that exact +# backend. Lets a test verify how that backend's state affects other connections. +def with_pinned_backend + pin = connect + pin.exec('BEGIN') + # CREATE TEMP TABLE is a write, so pgdog routes this open transaction to the + # primary pool and holds one of its backends. Simple-protocol PREPARE/EXECUTE + # also route to the primary (pgdog treats them as writes), so the pin sits in + # the same pool the EXECUTE checks out from - that is what lets it force the + # EXECUTE onto a different backend. + pin.exec('CREATE TEMP TABLE pin_block (x int)') + yield +ensure + pin.exec('ROLLBACK') rescue nil + pin.close rescue nil +end + +# Runs 3 background connections doing short transactions for the block's +# duration to keep backends busy; stopped and joined when the block ends. +# The churn breaks pgdog's LIFO reuse, so a connection's repeated requests for +# the same statement land on different backends instead of always the same one. +def with_load + running = Concurrent::AtomicBoolean.new(true) + threads = 3.times.map do + Thread.new do + c = nil + begin + c = connect + while running.true? + c.exec('BEGIN') + c.exec('SELECT 1') + c.exec('COMMIT') + end + rescue PG::Error + nil + ensure + c.close rescue nil + end + end + end + yield +ensure + running.make_false + threads.each(&:join) +end + def ensure_done deadline = Time.now + 2 pools = []