Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration/ruby/Gemfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

source 'https://rubygems.org'
gem 'concurrent-ruby'
gem 'datadog', '~> 2.0'
gem 'pg'
gem 'rails'
Expand Down
1 change: 1 addition & 0 deletions integration/ruby/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ PLATFORMS
x86_64-linux-musl

DEPENDENCIES
concurrent-ruby
datadog (~> 2.0)
pg
rails
Expand Down
2 changes: 1 addition & 1 deletion integration/ruby/prepared_disabled/pgdog.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[general]
prepared_statements = "disabled"
default_pool_size = 2
default_pool_size = 5

[[databases]]
name = "pgdog"
Expand Down
79 changes: 27 additions & 52 deletions integration/ruby/prepared_disabled/prepared_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,69 +44,44 @@
conn2.close
end

# In transaction pool mode each query can land on a different backend.
# disabled mode forwards Parse and Bind as-is with no global cache, so a
# Bind that arrives on a backend that never saw the Parse fails.
#
# Sequential tests cannot force a crossing: a single connection always
# returns the backend to the LIFO top between queries, so the next query
# gets the same backend. Concurrent threads make the pool hand out both
# backends simultaneously. With 5 threads and pool_size = 2, the pigeonhole
# principle guarantees that at least 3 threads will attempt PREPARE on a
# backend that already holds the statement, producing 'already exists'
# errors — regardless of timing.
#
# Mirror: full/'executes named extended-protocol statements in
# transaction pool mode' — same structure, opposite expectation.
# Test that the prepared statement that is created inside the connection
# could not be reliably executed with prepared_statements=false since it
# could land on another backend connection that doesn't have this
# statement prepared.
it 'fails named extended-protocol statements in transaction pool mode' do
errors = []
mutex = Mutex.new
conn = connect
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it pretty fundamentally changes the semantics of the test. Do we not care about the behavior of multiple competing threads? Can you clarify why we don't care in your commit message or PR description?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the tests and the pr description to address this and other comments related to prepared statements, hope this helps

conn.prepare('ext_stmt', 'SELECT $1::bigint AS val')

threads = 5.times.map do
Thread.new do
conn = connect
begin
conn.prepare('ext_stmt', 'SELECT $1::bigint AS val')
20.times { conn.exec_prepared('ext_stmt', [42]) }
rescue PG::Error => e
mutex.synchronize { errors << e.message }
ensure
conn.close rescue nil
with_pinned_backend do # hold the backend conn prepared on
with_load do
20.times do
expect { conn.exec_prepared('ext_stmt', [1]) }.to raise_error(PG::Error)
end
end
end

threads.each(&:join)
expect(errors).not_to be_empty
ensure
conn.close rescue nil
end

# Same mechanism as the extended-protocol test above, but for the
# simple-protocol PREPARE/EXECUTE path. disabled mode forwards the SQL
# statement text as-is, so EXECUTE on a backend that never saw the
# PREPARE fails with 'prepared statement does not exist' or, if two
# threads hit the same backend, 'already exists'.
#
# Mirror: full/'rewrites simple-protocol PREPARE / EXECUTE in
# transaction pool mode' — same structure, opposite expectation.
# Test that a statement created with SQL PREPARE inside the connection
# could not be reliably executed with prepared_statements=false since the
# EXECUTE could land on another backend connection that doesn't have this
# statement prepared.
it 'fails SQL PREPARE/EXECUTE in transaction pool mode' do
errors = []
mutex = Mutex.new
conn = connect
# PREPARE and EXECUTE both route to the primary pool (pgdog treats them as
# writes), the same pool the pin holds a backend in. disabled mode never
# replays them, so every EXECUTE forced onto another backend fails.
conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2 AS val')

threads = 5.times.map do
Thread.new do
conn = connect
begin
conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2')
20.times { |i| conn.exec("EXECUTE sql_stmt(#{i})") }
rescue PG::Error => e
mutex.synchronize { errors << e.message }
ensure
conn.close rescue nil
with_pinned_backend do # hold the backend conn prepared on
with_load do
20.times do
expect { conn.exec('EXECUTE sql_stmt(1)') }.to raise_error(PG::Error)
end
end
end

threads.each(&:join)
expect(errors).not_to be_empty
ensure
conn.close rescue nil
end
end
2 changes: 1 addition & 1 deletion integration/ruby/prepared_full/pgdog.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[general]
prepared_statements = "full"
default_pool_size = 2
default_pool_size = 5

[[databases]]
name = "pgdog"
Expand Down
74 changes: 29 additions & 45 deletions integration/ruby/prepared_full/prepared_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,28 @@
conn.close
end

# Mirror of disabled/'fails SQL PREPARE/EXECUTE in transaction pool mode'.
# full mode intercepts PREPARE, renames the statement to an internal name
# (__pgdog_N), and replays the PREPARE on any backend that hasn't seen it
# before executing. 5 threads × 20 iterations with pool_size = 2 generates
# the same backend crossings as the disabled test, but all succeed.
# Test that a statement created with SQL PREPARE inside the connection
# can be reliably executed with prepared_statements=full even when the
# EXECUTE lands on another backend connection, because pgdog replays the
# PREPARE onto it.
it 'rewrites simple-protocol PREPARE / EXECUTE in transaction pool mode' do
errors = []
mutex = Mutex.new
conn = connect
# PREPARE and EXECUTE both route to the primary pool (pgdog treats them as
# writes), the same pool the pin holds a backend in, so each EXECUTE is forced
# onto a backend that never saw the PREPARE -- full mode replays it there, so
# they still succeed.
conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2 AS val')

threads = 5.times.map do
Thread.new do
conn = connect
begin
conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2')
20.times { |i| conn.exec("EXECUTE sql_stmt(#{i})") }
rescue PG::Error => e
mutex.synchronize { errors << e.message }
ensure
conn.close rescue nil
with_pinned_backend do # hold the backend conn prepared on
with_load do
20.times do |i|
res = conn.exec("EXECUTE sql_stmt(#{i})")
expect(res[0]['val'].to_i).to eq(i * 2)
end
end
end

threads.each(&:join)
expect(errors).to be_empty
ensure
conn.close rescue nil
end

# Session mode gives each client its own dedicated backend, so two session
Expand All @@ -70,36 +67,23 @@
conn2.close
end

# Mirror of disabled/'fails named extended-protocol statements in
# transaction pool mode'. full mode renames each frontend's Parse to an
# internal name (__pgdog_N, unique per frontend) and replays it on any
# backend before the Bind. 5 threads × 20 iterations with pool_size = 2
# forces genuine crossings \ the replay ensures all succeed.
# Result values are also verified to guard against silent data corruption.
# Test that the prepared statement that is created inside the connection
# can be reliably executed with prepared_statements=full even when it lands
# on another backend connection, because pgdog replays the prepare onto it.
it 'executes named extended-protocol statements in transaction pool mode' do
errors = []
mutex = Mutex.new
conn = connect
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as the previous comment here. Can you clarify why we no longer care about the behavior with multiple competing threads?

conn.prepare('ext_stmt', 'SELECT $1::bigint AS val')

threads = 5.times.map do
Thread.new do
conn = connect
begin
conn.prepare('ext_stmt', 'SELECT $1::bigint AS val')
20.times do |i|
res = conn.exec_prepared('ext_stmt', [i])
val = res[0]['val'].to_i
raise "expected #{i}, got #{val}" unless val == i
end
rescue => e
mutex.synchronize { errors << e.message }
ensure
conn.close rescue nil
with_pinned_backend do # hold the backend conn prepared on
with_load do
20.times do |i|
res = conn.exec_prepared('ext_stmt', [i])
expect(res[0]['val'].to_i).to eq(i)
end
end
end

threads.each(&:join)
expect(errors).to be_empty
ensure
conn.close rescue nil
end

end
72 changes: 28 additions & 44 deletions integration/ruby/prepared_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,61 +47,45 @@
conn2.close
end

# extended mode renames each frontend's Parse to an internal name
# (__pgdog_N, unique per frontend) and replays it on any backend before
# the Bind. 15 threads × 20 iterations with a default pool of 10 guarantees
# genuine crossings via the pigeonhole principle; the replay ensures all succeed.
# Result values are verified to guard against silent data corruption.
# Test that the prepared statement that is created inside the connection
# can be reliably executed with prepared_statements=extended even when it
# lands on another backend connection, because pgdog replays the prepare
# onto it.
it 'executes named extended-protocol statements in transaction pool mode' do
errors = []
mutex = Mutex.new
conn = connect
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here

conn.prepare('ext_stmt', 'SELECT $1::bigint AS val')

threads = 15.times.map do
Thread.new do
conn = connect
begin
conn.prepare('ext_stmt', 'SELECT $1::bigint AS val')
20.times do |i|
res = conn.exec_prepared('ext_stmt', [i])
val = res[0]['val'].to_i
raise "expected #{i}, got #{val}" unless val == i
end
rescue => e
mutex.synchronize { errors << e.message }
ensure
conn.close rescue nil
with_pinned_backend do
with_load do
20.times do |i|
res = conn.exec_prepared('ext_stmt', [i])
expect(res[0]['val'].to_i).to eq(i)
end
end
end

threads.each(&:join)
expect(errors).to be_empty
ensure
conn.close rescue nil
end

# extended mode does NOT intercept SQL PREPARE / EXECUTE — those are
# forwarded as-is. With 15 threads and a default pool of 10, the pigeonhole
# principle guarantees crossings: at least 5 threads hit a backend that
# already holds 'sql_stmt' ('already exists') or one that never saw the
# PREPARE ('does not exist'). Either way, errors accumulate.
# Test that a statement created with SQL PREPARE inside the connection
# could not be reliably executed with prepared_statements=extended, which
# does not intercept SQL PREPARE/EXECUTE, so the EXECUTE could land on
# another backend connection that doesn't have this statement prepared.
it 'fails SQL PREPARE/EXECUTE in transaction pool mode' do
errors = []
mutex = Mutex.new
conn = connect
# PREPARE and EXECUTE both route to the primary pool (pgdog treats them as
# writes), the same pool the pin holds a backend in. extended mode does not
# intercept SQL PREPARE/EXECUTE, so every EXECUTE forced onto another backend fails.
conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2 AS val')

threads = 15.times.map do
Thread.new do
conn = connect
begin
conn.exec('PREPARE sql_stmt AS SELECT $1::bigint * 2')
20.times { |i| conn.exec("EXECUTE sql_stmt(#{i})") }
rescue PG::Error => e
mutex.synchronize { errors << e.message }
ensure
conn.close rescue nil
with_pinned_backend do # hold the backend conn prepared on
with_load do
20.times do
expect { conn.exec('EXECUTE sql_stmt(1)') }.to raise_error(PG::Error)
end
end
end

threads.each(&:join)
expect(errors).not_to be_empty
ensure
conn.close rescue nil
end
end
50 changes: 50 additions & 0 deletions integration/ruby/rspec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'toxiproxy'
require 'datadog'
require 'securerandom'
require 'concurrent'

# Configure Datadog APM so ActiveRecord auto-injects sqlcommenter-style
# trailing comments (traceparent, tracestate, dddbs, dde, ddps, ddpv, ddh,
Expand Down Expand Up @@ -54,6 +55,55 @@ def connect(db = 'pgdog', user = 'pgdog')
PG.connect(dbname: db, user: user, password: 'pgdog', port: 6432, host: '127.0.0.1')
end

# "Pin" a backend server by holding it inside an open transaction so the pool
# cannot hand it to any other connection. Relies on pgdog's LIFO idle-connection
# reuse: call this immediately after the query whose backend you want to hold
# (e.g. a PREPARE), before any other query, so the checkout reclaims that exact
# backend. Lets a test verify how that backend's state affects other connections.
def with_pinned_backend
pin = connect
pin.exec('BEGIN')
# CREATE TEMP TABLE is a write, so pgdog routes this open transaction to the
# primary pool and holds one of its backends. Simple-protocol PREPARE/EXECUTE
# also route to the primary (pgdog treats them as writes), so the pin sits in
# the same pool the EXECUTE checks out from - that is what lets it force the
# EXECUTE onto a different backend.
pin.exec('CREATE TEMP TABLE pin_block (x int)')
yield
ensure
pin.exec('ROLLBACK') rescue nil
pin.close rescue nil
end

# Runs 3 background connections doing short transactions for the block's
# duration to keep backends busy; stopped and joined when the block ends.
# The churn breaks pgdog's LIFO reuse, so a connection's repeated requests for
# the same statement land on different backends instead of always the same one.
def with_load
running = Concurrent::AtomicBoolean.new(true)
threads = 3.times.map do
Thread.new do
c = nil
begin
c = connect
while running.true?
c.exec('BEGIN')
c.exec('SELECT 1')
c.exec('COMMIT')
end
rescue PG::Error
nil
ensure
c.close rescue nil
end
end
end
yield
ensure
running.make_false
threads.each(&:join)
end

def ensure_done
deadline = Time.now + 2
pools = []
Expand Down
Loading