From dd661e2797812769d0c40457e24302ee3c402f23 Mon Sep 17 00:00:00 2001 From: Evan Schwartz <3262610+emschwartz@users.noreply.github.com> Date: Mon, 23 Feb 2026 10:25:36 -0500 Subject: [PATCH 1/6] feat: add SqliteRwPool for single-writer, multi-reader connection pooling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds SqliteRwPool, a connection pool that maintains a single writer and multiple readers for SQLite WAL-mode databases. Queries are automatically routed based on SQL analysis — SELECTs, EXPLAINs, read-only PRAGMAs, and read-only WITH CTEs go to readers; everything else goes to the writer. Co-Authored-By: Claude Opus 4.6 --- Cargo.toml | 5 + sqlx-sqlite/src/lib.rs | 3 + sqlx-sqlite/src/rw_pool.rs | 715 +++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 +- tests/sqlite/rw_pool.rs | 341 ++++++++++++++++++ 5 files changed, 1066 insertions(+), 1 deletion(-) create mode 100644 sqlx-sqlite/src/rw_pool.rs create mode 100644 tests/sqlite/rw_pool.rs diff --git a/Cargo.toml b/Cargo.toml index c88ab231e2..667c53b15d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -326,6 +326,11 @@ name = "sqlite-error" path = "tests/sqlite/error.rs" required-features = ["sqlite"] +[[test]] +name = "sqlite-rw-pool" +path = "tests/sqlite/rw_pool.rs" +required-features = ["sqlite"] + [[test]] name = "sqlite-sqlcipher" path = "tests/sqlite/sqlcipher.rs" diff --git a/sqlx-sqlite/src/lib.rs b/sqlx-sqlite/src/lib.rs index 17b4bed1ca..e7d37ea2d9 100644 --- a/sqlx-sqlite/src/lib.rs +++ b/sqlx-sqlite/src/lib.rs @@ -133,6 +133,9 @@ mod migrate; #[cfg(feature = "migrate")] mod testing; +mod rw_pool; +pub use rw_pool::{SqliteRwPool, SqliteRwPoolOptions}; + /// An alias for [`Pool`][crate::pool::Pool], specialized for SQLite. pub type SqlitePool = crate::pool::Pool; diff --git a/sqlx-sqlite/src/rw_pool.rs b/sqlx-sqlite/src/rw_pool.rs new file mode 100644 index 0000000000..93ac855cac --- /dev/null +++ b/sqlx-sqlite/src/rw_pool.rs @@ -0,0 +1,715 @@ +use crate::options::{SqliteJournalMode, SqliteSynchronous}; +use crate::{Sqlite, SqliteConnectOptions, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo}; + +use sqlx_core::acquire::Acquire; +use sqlx_core::error::{BoxDynError, Error}; +use sqlx_core::executor::{Execute, Executor}; +use sqlx_core::pool::{MaybePoolConnection, Pool, PoolConnection, PoolOptions}; +use sqlx_core::sql_str::SqlStr; +use sqlx_core::transaction::Transaction; +use sqlx_core::Either; + +use futures_core::future::BoxFuture; +use futures_core::stream::BoxStream; +use futures_util::TryStreamExt; + +use std::fmt; + +// ─── SQL Classification ──────────────────────────────────────────────────────── + +/// Split `sql` into "words" (maximal runs of alphanumeric/underscore characters). +fn sql_words(sql: &str) -> impl Iterator { + sql.split(|c: char| !c.is_ascii_alphanumeric() && c != '_') + .filter(|w| !w.is_empty()) +} + +/// Check if any word in `sql` matches `keyword` (case-insensitive). +fn has_keyword(sql: &str, keyword: &str) -> bool { + sql_words(sql).any(|w| w.eq_ignore_ascii_case(keyword)) +} + +/// Determines whether a SQL statement should be routed to the read pool. +/// +/// Uses conservative heuristics — only definitively read-only statements +/// return `true`. Everything ambiguous routes to the writer (safe default). +/// +/// ## Routing rules +/// +/// - `SELECT` / `EXPLAIN` → always reader (cannot write in SQLite) +/// - `PRAGMA` without `=` → reader (read-only PRAGMA) +/// - `WITH` CTE → reader **only if** no write keywords appear anywhere +/// - Everything else → writer +#[inline] +pub(crate) fn is_read_only_sql(sql: &str) -> bool { + let first = match sql_words(sql).next() { + Some(w) => w, + None => return false, + }; + + if first.eq_ignore_ascii_case("SELECT") || first.eq_ignore_ascii_case("EXPLAIN") { + return true; + } + + if first.eq_ignore_ascii_case("PRAGMA") && !sql.contains('=') { + return true; + } + + // WITH CTEs: read-only if no write keywords appear anywhere. + // Safe false negatives: write keywords in string literals → routes to writer. + if first.eq_ignore_ascii_case("WITH") { + return !has_keyword(sql, "INSERT") + && !has_keyword(sql, "UPDATE") + && !has_keyword(sql, "DELETE") + && !has_keyword(sql, "REPLACE"); + } + + false +} + +// ─── SqliteRwPoolOptions ─────────────────────────────────────────────────────── + +/// Builder for [`SqliteRwPool`]. +/// +/// Provides full control over both the reader and writer pools, including +/// independent [`SqliteConnectOptions`] and [`PoolOptions`] for each. +/// +/// # Example +/// +/// ```rust,no_run +/// # async fn example() -> sqlx::Result<()> { +/// use sqlx::sqlite::{SqliteRwPoolOptions, SqliteConnectOptions}; +/// use sqlx::pool::PoolOptions; +/// use std::time::Duration; +/// +/// let pool = SqliteRwPoolOptions::new() +/// .max_readers(4) +/// .writer_pool_options( +/// PoolOptions::new().acquire_timeout(Duration::from_secs(10)) +/// ) +/// .connect("sqlite://data.db").await?; +/// # Ok(()) +/// # } +/// ``` +pub struct SqliteRwPoolOptions { + max_readers: Option, + reader_connect_options: Option, + writer_connect_options: Option, + reader_pool_options: Option>, + writer_pool_options: Option>, + auto_route: bool, + checkpoint_on_close: bool, +} + +impl Default for SqliteRwPoolOptions { + fn default() -> Self { + Self::new() + } +} + +impl SqliteRwPoolOptions { + /// Create a new `SqliteRwPoolOptions` with sensible defaults. + /// + /// Defaults: + /// - `max_readers`: number of available CPUs (or 4 if unavailable) + /// - `auto_route`: `true` + /// - `checkpoint_on_close`: `true` + pub fn new() -> Self { + Self { + max_readers: None, + reader_connect_options: None, + writer_connect_options: None, + reader_pool_options: None, + writer_pool_options: None, + auto_route: true, + checkpoint_on_close: true, + } + } + + /// Set the maximum number of reader connections. + /// + /// Defaults to the number of available CPUs. + pub fn max_readers(mut self, max: u32) -> Self { + self.max_readers = Some(max); + self + } + + /// Override the [`SqliteConnectOptions`] used for reader connections. + /// + /// WAL journal mode and `read_only(true)` will still be applied on top. + pub fn reader_connect_options(mut self, opts: SqliteConnectOptions) -> Self { + self.reader_connect_options = Some(opts); + self + } + + /// Override the [`SqliteConnectOptions`] used for the writer connection. + /// + /// WAL journal mode and `synchronous(Normal)` will still be applied on top. + pub fn writer_connect_options(mut self, opts: SqliteConnectOptions) -> Self { + self.writer_connect_options = Some(opts); + self + } + + /// Override the [`PoolOptions`] used for the reader pool. + /// + /// `max_connections` will be overridden by [`max_readers`](Self::max_readers) + /// if also set. + pub fn reader_pool_options(mut self, opts: PoolOptions) -> Self { + self.reader_pool_options = Some(opts); + self + } + + /// Override the [`PoolOptions`] used for the writer pool. + /// + /// `max_connections` is always forced to 1 for the writer pool. + pub fn writer_pool_options(mut self, opts: PoolOptions) -> Self { + self.writer_pool_options = Some(opts); + self + } + + /// Enable or disable automatic SQL-based routing. + /// + /// When enabled (the default), the [`Executor`] impl inspects each query's SQL + /// to decide whether to use the reader or writer pool. When disabled, all + /// queries go to the writer; readers are only used via [`SqliteRwPool::reader()`]. + pub fn auto_route(mut self, auto_route: bool) -> Self { + self.auto_route = auto_route; + self + } + + /// Run `PRAGMA wal_checkpoint(PASSIVE)` on close. + /// + /// Enabled by default. This flushes as much WAL data as possible to the + /// main database file without blocking. + pub fn checkpoint_on_close(mut self, checkpoint: bool) -> Self { + self.checkpoint_on_close = checkpoint; + self + } + + /// Create the pool by parsing a connection URL. + pub async fn connect(self, url: &str) -> Result { + let options: SqliteConnectOptions = url.parse()?; + self.connect_with(options).await + } + + /// Create the pool from explicit [`SqliteConnectOptions`]. + /// + /// The writer pool is created first to ensure WAL mode is established + /// before any readers connect. + pub async fn connect_with( + self, + base_options: SqliteConnectOptions, + ) -> Result { + let num_cpus = std::thread::available_parallelism() + .map(|n| n.get() as u32) + .unwrap_or(4); + + // Configure writer: WAL mode + synchronous(Normal) + let writer_opts = self + .writer_connect_options + .unwrap_or_else(|| base_options.clone()) + .journal_mode(SqliteJournalMode::Wal) + .synchronous(SqliteSynchronous::Normal); + + // Configure reader: read_only only. + // WAL mode is NOT set here because the reader connection is opened with + // SQLITE_OPEN_READONLY, and `PRAGMA journal_mode = wal` is a write operation + // that would deadlock on a read-only connection. The writer already ensures + // WAL mode is active on the database file; readers inherit it automatically. + let reader_opts = self + .reader_connect_options + .unwrap_or_else(|| base_options) + .read_only(true); + + // Writer pool: always exactly 1 connection + let writer_pool_opts = self + .writer_pool_options + .unwrap_or_else(PoolOptions::new) + .max_connections(1); + + // Reader pool: configurable, defaults to num_cpus + let max_readers = self.max_readers.unwrap_or(num_cpus); + let reader_pool_opts = self + .reader_pool_options + .unwrap_or_else(PoolOptions::new) + .max_connections(max_readers); + + // Create writer pool FIRST — establishes WAL mode on the database file + let write_pool = writer_pool_opts.connect_with(writer_opts).await?; + + // Then create reader pool + let read_pool = reader_pool_opts.connect_with(reader_opts).await?; + + Ok(SqliteRwPool { + read_pool, + write_pool, + auto_route: self.auto_route, + checkpoint_on_close: self.checkpoint_on_close, + }) + } +} + +// ─── SqliteRwPool ────────────────────────────────────────────────────────────── + +/// A single-writer, multi-reader connection pool for SQLite. +/// +/// SQLite only allows one writer at a time. When multiple connections compete +/// for the write lock, you get busy timeouts and performance degradation. +/// `SqliteRwPool` solves this by maintaining: +/// +/// - A **writer pool** with a single connection for all write operations +/// - A **reader pool** with multiple read-only connections for queries +/// +/// # Auto-Routing +/// +/// When enabled (the default), the [`Executor`] impl inspects each query's SQL +/// and routes it to the appropriate pool: +/// +/// - `SELECT`, `EXPLAIN`, read-only `PRAGMA` → reader pool +/// - `WITH` CTEs without write keywords → reader pool +/// - Everything else → writer pool +/// +/// # WAL Mode +/// +/// This pool requires and automatically configures +/// [WAL mode](https://www.sqlite.org/wal.html), which allows concurrent +/// readers alongside a single writer. +/// +/// # Important +/// +/// You must call [`close()`](SqliteRwPool::close) explicitly for the WAL +/// checkpoint to run. Dropping the pool without calling `close()` will skip +/// the checkpoint, even though `checkpoint_on_close` is enabled by default. +/// The checkpoint uses `PASSIVE` mode, which flushes as much WAL data as +/// possible without blocking. +/// +/// # Example +/// +/// ```rust,no_run +/// # async fn example() -> sqlx::Result<()> { +/// use sqlx::sqlite::SqliteRwPool; +/// +/// let pool = SqliteRwPool::connect("sqlite://data.db").await?; +/// +/// // SELECT → automatically routed to reader pool +/// let rows = sqlx::query("SELECT * FROM users") +/// .fetch_all(&pool).await?; +/// +/// // INSERT → automatically routed to writer pool +/// sqlx::query("INSERT INTO users (name) VALUES (?)") +/// .bind("Alice") +/// .execute(&pool).await?; +/// +/// pool.close().await; +/// # Ok(()) +/// # } +/// ``` +#[derive(Clone)] +pub struct SqliteRwPool { + read_pool: Pool, + write_pool: Pool, + auto_route: bool, + checkpoint_on_close: bool, +} + +impl SqliteRwPool { + /// Create a pool with default options by parsing a connection URL. + /// + /// Equivalent to `SqliteRwPoolOptions::new().connect(url)`. + pub async fn connect(url: &str) -> Result { + SqliteRwPoolOptions::new().connect(url).await + } + + /// Create a pool with default options from explicit connect options. + /// + /// Equivalent to `SqliteRwPoolOptions::new().connect_with(options)`. + pub async fn connect_with(options: SqliteConnectOptions) -> Result { + SqliteRwPoolOptions::new().connect_with(options).await + } + + /// Get a reference to the underlying reader pool. + /// + /// Useful for explicitly routing queries to readers, bypassing auto-routing. + /// + /// # Note + /// + /// Attempting to execute a write statement on a reader connection will + /// return a `SQLITE_READONLY` error from SQLite. + pub fn reader(&self) -> &Pool { + &self.read_pool + } + + /// Get a reference to the underlying writer pool. + pub fn writer(&self) -> &Pool { + &self.write_pool + } + + /// Acquire a read-only connection from the reader pool. + pub fn acquire_reader( + &self, + ) -> impl std::future::Future, Error>> + 'static { + self.read_pool.acquire() + } + + /// Acquire a writable connection from the writer pool. + pub fn acquire_writer( + &self, + ) -> impl std::future::Future, Error>> + 'static { + self.write_pool.acquire() + } + + /// Start a transaction on the writer pool. + pub async fn begin(&self) -> Result, Error> { + let conn = self.write_pool.acquire().await?; + Transaction::begin(MaybePoolConnection::PoolConnection(conn), None).await + } + + /// Start a transaction on the writer pool with a custom `BEGIN` statement. + pub async fn begin_with( + &self, + statement: impl sqlx_core::sql_str::SqlSafeStr, + ) -> Result, Error> { + let conn = self.write_pool.acquire().await?; + Transaction::begin( + MaybePoolConnection::PoolConnection(conn), + Some(statement.into_sql_str()), + ) + .await + } + + /// Shut down the pool. + /// + /// If `checkpoint_on_close` is enabled (the default), closes all reader + /// connections first, then runs `PRAGMA wal_checkpoint(PASSIVE)` on the + /// writer to flush as much WAL data as possible to the main database file. + pub async fn close(&self) { + // Close readers first so the checkpoint isn't blocked by active readers. + self.read_pool.close().await; + + if self.checkpoint_on_close && !self.write_pool.is_closed() { + if let Ok(mut conn) = self.write_pool.acquire().await { + // Best-effort WAL checkpoint + let _ = Executor::execute( + &mut *conn, + "PRAGMA wal_checkpoint(PASSIVE)", + ) + .await; + } + } + + self.write_pool.close().await; + } + + /// Returns `true` if either pool has been closed. + pub fn is_closed(&self) -> bool { + self.write_pool.is_closed() || self.read_pool.is_closed() + } + + /// Returns the number of active reader connections (including idle). + pub fn num_readers(&self) -> u32 { + self.read_pool.size() + } + + /// Returns the number of idle reader connections. + pub fn num_idle_readers(&self) -> usize { + self.read_pool.num_idle() + } + + /// Returns the number of active writer connections (including idle). + pub fn num_writers(&self) -> u32 { + self.write_pool.size() + } + + /// Returns the number of idle writer connections. + pub fn num_idle_writers(&self) -> usize { + self.write_pool.num_idle() + } +} + +impl fmt::Debug for SqliteRwPool { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SqliteRwPool") + .field("read_pool", &self.read_pool) + .field("write_pool", &self.write_pool) + .field("auto_route", &self.auto_route) + .field("checkpoint_on_close", &self.checkpoint_on_close) + .finish() + } +} + +// ─── Executor impl ───────────────────────────────────────────────────────────── + +/// Carries decomposed query parts for re-delegation to a connection's [`Executor`] impl. +/// +/// This allows the `SqliteRwPool` to inspect the SQL for routing before delegating +/// execution to the appropriate pool's connection, without coupling to +/// `SqliteConnection` internals. +struct RoutedQuery { + sql: SqlStr, + arguments: Option, + persistent: bool, +} + +impl Execute<'_, Sqlite> for RoutedQuery { + fn sql(self) -> SqlStr { + self.sql + } + + fn statement(&self) -> Option<&SqliteStatement> { + None + } + + fn take_arguments( + &mut self, + ) -> Result, BoxDynError> { + Ok(self.arguments.take()) + } + + fn persistent(&self) -> bool { + self.persistent + } +} + +impl<'p> Executor<'p> for &SqliteRwPool { + type Database = Sqlite; + + fn fetch_many<'e, 'q, E>( + self, + mut query: E, + ) -> BoxStream<'e, Result, Error>> + where + 'p: 'e, + E: Execute<'q, Sqlite>, + 'q: 'e, + E: 'q, + { + let pool = self.clone(); + + Box::pin(try_stream! { + let arguments = query.take_arguments().map_err(Error::Encode)?; + let persistent = query.persistent(); + let sql = query.sql(); + + let use_reader = pool.auto_route && is_read_only_sql(sql.as_str()); + let target_pool = if use_reader { &pool.read_pool } else { &pool.write_pool }; + let mut conn = target_pool.acquire().await?; + + let routed = RoutedQuery { sql, arguments, persistent }; + let mut s = conn.fetch_many(routed); + + while let Some(v) = s.try_next().await? { + r#yield!(v); + } + + Ok(()) + }) + } + + fn fetch_optional<'e, 'q, E>( + self, + mut query: E, + ) -> BoxFuture<'e, Result, Error>> + where + 'p: 'e, + E: Execute<'q, Sqlite>, + 'q: 'e, + E: 'q, + { + let pool = self.clone(); + + Box::pin(async move { + let arguments = query.take_arguments().map_err(Error::Encode)?; + let persistent = query.persistent(); + let sql = query.sql(); + + let use_reader = pool.auto_route && is_read_only_sql(sql.as_str()); + let target_pool = if use_reader { &pool.read_pool } else { &pool.write_pool }; + let mut conn = target_pool.acquire().await?; + + let routed = RoutedQuery { sql, arguments, persistent }; + conn.fetch_optional(routed).await + }) + } + + fn prepare_with<'e>( + self, + sql: SqlStr, + parameters: &'e [SqliteTypeInfo], + ) -> BoxFuture<'e, Result> + where + 'p: 'e, + { + let pool = self.write_pool.clone(); + + Box::pin(async move { pool.acquire().await?.prepare_with(sql, parameters).await }) + } + + #[doc(hidden)] + #[cfg(feature = "offline")] + fn describe<'e>( + self, + sql: SqlStr, + ) -> BoxFuture<'e, Result, Error>> + where + 'p: 'e, + { + let pool = self.write_pool.clone(); + + Box::pin(async move { pool.acquire().await?.describe(sql).await }) + } +} + +// ─── Acquire impl ────────────────────────────────────────────────────────────── + +impl<'a> Acquire<'a> for &SqliteRwPool { + type Database = Sqlite; + type Connection = PoolConnection; + + /// Always acquires from the writer pool. + /// + /// This is the safe default because code using `acquire()` may need to + /// write, and [`sqlx::migrate!().run()`] uses `Acquire` internally. + /// Use [`SqliteRwPool::acquire_reader()`] for explicit read-only access. + fn acquire(self) -> BoxFuture<'static, Result> { + Box::pin(self.write_pool.acquire()) + } + + /// Begins a transaction on the writer pool. + fn begin(self) -> BoxFuture<'static, Result, Error>> { + let pool = self.write_pool.clone(); + + Box::pin(async move { + let conn = pool.acquire().await?; + Transaction::begin(MaybePoolConnection::PoolConnection(conn), None).await + }) + } +} + +// ─── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + // SELECT variants + #[test] + fn select_is_read_only() { + assert!(is_read_only_sql("SELECT * FROM users")); + assert!(is_read_only_sql("select * from users")); + assert!(is_read_only_sql("Select 1")); + assert!(is_read_only_sql(" SELECT 1")); + assert!(is_read_only_sql("\n\tSELECT 1")); + assert!(is_read_only_sql("SELECT count(*) FROM orders WHERE status = 'active'")); + } + + // EXPLAIN + #[test] + fn explain_is_read_only() { + assert!(is_read_only_sql("EXPLAIN SELECT 1")); + assert!(is_read_only_sql("EXPLAIN QUERY PLAN SELECT * FROM users")); + assert!(is_read_only_sql("explain query plan select 1")); + } + + // PRAGMA + #[test] + fn pragma_routing() { + // Read-only PRAGMAs (no =) + assert!(is_read_only_sql("PRAGMA journal_mode")); + assert!(is_read_only_sql("PRAGMA table_info(users)")); + assert!(is_read_only_sql("pragma page_count")); + + // Write PRAGMAs (with =) + assert!(!is_read_only_sql("PRAGMA journal_mode = WAL")); + assert!(!is_read_only_sql("PRAGMA synchronous = NORMAL")); + } + + // WITH CTEs + #[test] + fn with_cte_routing() { + // Read-only CTEs + assert!(is_read_only_sql( + "WITH t AS (SELECT 1) SELECT * FROM t" + )); + assert!(is_read_only_sql( + "WITH RECURSIVE cte(n) AS (SELECT 1 UNION ALL SELECT n+1 FROM cte WHERE n < 10) SELECT * FROM cte" + )); + + // Write CTEs + assert!(!is_read_only_sql( + "WITH t AS (SELECT 1) INSERT INTO foo SELECT * FROM t" + )); + assert!(!is_read_only_sql( + "WITH t AS (SELECT 1) UPDATE foo SET bar = 1" + )); + assert!(!is_read_only_sql( + "WITH t AS (SELECT 1) DELETE FROM foo" + )); + assert!(!is_read_only_sql( + "WITH t AS (SELECT 1) REPLACE INTO foo SELECT * FROM t" + )); + } + + // Write operations → writer + #[test] + fn write_operations_are_not_read_only() { + assert!(!is_read_only_sql("INSERT INTO users VALUES (1)")); + assert!(!is_read_only_sql("UPDATE users SET name = 'Bob'")); + assert!(!is_read_only_sql("DELETE FROM users")); + assert!(!is_read_only_sql("REPLACE INTO users VALUES (1)")); + assert!(!is_read_only_sql("CREATE TABLE foo (id INT)")); + assert!(!is_read_only_sql("DROP TABLE foo")); + assert!(!is_read_only_sql("ALTER TABLE foo ADD COLUMN bar INT")); + assert!(!is_read_only_sql("CREATE INDEX idx ON foo(bar)")); + } + + // Transaction control → writer + #[test] + fn transaction_control_is_not_read_only() { + assert!(!is_read_only_sql("BEGIN")); + assert!(!is_read_only_sql("BEGIN TRANSACTION")); + assert!(!is_read_only_sql("COMMIT")); + assert!(!is_read_only_sql("ROLLBACK")); + assert!(!is_read_only_sql("SAVEPOINT sp1")); + } + + // Edge cases + #[test] + fn edge_cases() { + // Empty / whitespace + assert!(!is_read_only_sql("")); + assert!(!is_read_only_sql(" ")); + + // Keywords as substrings should NOT match + assert!(!is_read_only_sql("SELECTIVITY_CHECK()")); + + // ATTACH / DETACH → writer + assert!(!is_read_only_sql("ATTACH DATABASE ':memory:' AS db2")); + assert!(!is_read_only_sql("DETACH DATABASE db2")); + + // False negative: write keyword in string literal → safely routes to writer + assert!(!is_read_only_sql( + "WITH t AS (SELECT 'DELETE') SELECT * FROM t" + )); + } + + // Word boundary checks + #[test] + fn word_boundary_checks() { + // "SELECTED" should not match "SELECT" + assert!(!is_read_only_sql("SELECTED * FROM foo")); + + // "SELECTS" should not match "SELECT" + assert!(!is_read_only_sql("SELECTS something")); + + // "INSERTS" in a WITH should not trigger the INSERT guard + // (but it also won't match as a standalone keyword) + assert!(is_read_only_sql( + "WITH t AS (SELECT * FROM inserts_log) SELECT * FROM t" + )); + + // "UPDATE" as a column name in a table won't match because + // it would need word boundaries + assert!(is_read_only_sql( + "WITH t AS (SELECT updated FROM foo) SELECT * FROM t" + )); + } +} diff --git a/src/lib.rs b/src/lib.rs index 438463210d..f7425ef462 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,7 +62,8 @@ pub use sqlx_postgres::{ #[cfg_attr(docsrs, doc(cfg(feature = "_sqlite")))] #[doc(inline)] pub use sqlx_sqlite::{ - self as sqlite, Sqlite, SqliteConnection, SqliteExecutor, SqlitePool, SqliteTransaction, + self as sqlite, Sqlite, SqliteConnection, SqliteExecutor, SqlitePool, SqliteRwPool, + SqliteRwPoolOptions, SqliteTransaction, }; #[cfg(feature = "any")] diff --git a/tests/sqlite/rw_pool.rs b/tests/sqlite/rw_pool.rs new file mode 100644 index 0000000000..25e50a5bb8 --- /dev/null +++ b/tests/sqlite/rw_pool.rs @@ -0,0 +1,341 @@ +use sqlx::sqlite::{SqliteConnectOptions, SqliteRwPool, SqliteRwPoolOptions}; +use sqlx::{Acquire, Row}; +use std::str::FromStr; +use tempfile::TempDir; + +fn temp_db_opts() -> (SqliteConnectOptions, TempDir) { + let dir = TempDir::new().unwrap(); + let filepath = dir.path().join("test.db"); + let opts = SqliteConnectOptions::from_str(&format!("sqlite://{}", filepath.display())) + .unwrap() + .create_if_missing(true); + (opts, dir) +} + +// ─── Basic connectivity ──────────────────────────────────────────────────────── + +#[sqlx_macros::test] +async fn rw_pool_connect_and_close() -> anyhow::Result<()> { + let (opts, _dir) = temp_db_opts(); + let pool = SqliteRwPool::connect_with(opts).await?; + + assert!(!pool.is_closed()); + assert!(pool.num_writers() > 0); + + pool.close().await; + assert!(pool.is_closed()); + + Ok(()) +} + +// ─── Auto-routing: SELECT goes to reader, INSERT goes to writer ──────────────── + +#[sqlx_macros::test] +async fn rw_pool_auto_routes_queries() -> anyhow::Result<()> { + let (opts, _dir) = temp_db_opts(); + let pool = SqliteRwPool::connect_with(opts).await?; + + // Create table via writer (auto-routed) + sqlx::query("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL)") + .execute(&pool) + .await?; + + // INSERT via writer (auto-routed) + sqlx::query("INSERT INTO users (name) VALUES (?)") + .bind("Alice") + .execute(&pool) + .await?; + + sqlx::query("INSERT INTO users (name) VALUES (?)") + .bind("Bob") + .execute(&pool) + .await?; + + // SELECT via reader (auto-routed) + let rows = sqlx::query("SELECT name FROM users ORDER BY name") + .fetch_all(&pool) + .await?; + + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].get::("name"), "Alice"); + assert_eq!(rows[1].get::("name"), "Bob"); + + pool.close().await; + Ok(()) +} + +// ─── Transactions always use writer ──────────────────────────────────────────── + +#[sqlx_macros::test] +async fn rw_pool_transactions_use_writer() -> anyhow::Result<()> { + let (opts, _dir) = temp_db_opts(); + let pool = SqliteRwPool::connect_with(opts).await?; + + sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, val TEXT)") + .execute(&pool) + .await?; + + // begin() should use writer + let mut tx = pool.begin().await?; + + sqlx::query("INSERT INTO items (val) VALUES ('a')") + .execute(&mut *tx) + .await?; + sqlx::query("INSERT INTO items (val) VALUES ('b')") + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + let count = sqlx::query_scalar::("SELECT count(*) FROM items") + .fetch_one(&pool) + .await?; + + assert_eq!(count, 2); + + // Test rollback + let mut tx = pool.begin().await?; + sqlx::query("INSERT INTO items (val) VALUES ('c')") + .execute(&mut *tx) + .await?; + tx.rollback().await?; + + let count = sqlx::query_scalar::("SELECT count(*) FROM items") + .fetch_one(&pool) + .await?; + + assert_eq!(count, 2); // still 2, rollback worked + + pool.close().await; + Ok(()) +} + +// ─── Acquire routes to writer ────────────────────────────────────────────────── + +#[sqlx_macros::test] +async fn rw_pool_acquire_routes_to_writer() -> anyhow::Result<()> { + let (opts, _dir) = temp_db_opts(); + let pool = SqliteRwPool::connect_with(opts).await?; + + // Acquire via the Acquire trait should give writer + let mut conn = (&pool).acquire().await?; + + sqlx::query("CREATE TABLE acq_test (id INTEGER PRIMARY KEY)") + .execute(&mut *conn) + .await?; + + sqlx::query("INSERT INTO acq_test (id) VALUES (1)") + .execute(&mut *conn) + .await?; + + drop(conn); + + let row = sqlx::query("SELECT id FROM acq_test") + .fetch_one(&pool) + .await?; + assert_eq!(row.get::("id"), 1); + + pool.close().await; + Ok(()) +} + +// ─── Explicit reader/writer access ───────────────────────────────────────────── + +#[sqlx_macros::test] +async fn rw_pool_explicit_reader_writer() -> anyhow::Result<()> { + let (opts, _dir) = temp_db_opts(); + let pool = SqliteRwPool::connect_with(opts).await?; + + // Writer: create and populate table + let mut writer = pool.acquire_writer().await?; + sqlx::query("CREATE TABLE expl (id INTEGER PRIMARY KEY, val TEXT)") + .execute(&mut *writer) + .await?; + sqlx::query("INSERT INTO expl (val) VALUES ('hello')") + .execute(&mut *writer) + .await?; + drop(writer); + + // Reader: can read + let mut reader = pool.acquire_reader().await?; + let rows = sqlx::query("SELECT val FROM expl") + .fetch_all(&mut *reader) + .await?; + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].get::("val"), "hello"); + drop(reader); + + pool.close().await; + Ok(()) +} + +// ─── Reader rejects writes ───────────────────────────────────────────────────── + +#[sqlx_macros::test] +async fn rw_pool_reader_rejects_writes() -> anyhow::Result<()> { + let (opts, _dir) = temp_db_opts(); + let pool = SqliteRwPool::connect_with(opts).await?; + + // Setup + sqlx::query("CREATE TABLE ro_test (id INTEGER PRIMARY KEY)") + .execute(&pool) + .await?; + + // Attempting to write via an explicit reader should fail + let mut reader = pool.acquire_reader().await?; + let result = sqlx::query("INSERT INTO ro_test (id) VALUES (1)") + .execute(&mut *reader) + .await; + + assert!(result.is_err(), "write on read-only connection should fail"); + let err = result.unwrap_err().to_string(); + assert!( + err.contains("readonly") || err.contains("read-only") || err.contains("SQLITE_READONLY"), + "expected readonly error, got: {err}" + ); + + drop(reader); + pool.close().await; + Ok(()) +} + +// ─── PRAGMA routing ──────────────────────────────────────────────────────────── + +#[sqlx_macros::test] +async fn rw_pool_pragma_routing() -> anyhow::Result<()> { + let (opts, _dir) = temp_db_opts(); + let pool = SqliteRwPool::connect_with(opts).await?; + + // Read-only PRAGMA (no =) should succeed (routes to reader) + let row = sqlx::query("PRAGMA journal_mode").fetch_one(&pool).await?; + let mode = row.get::(0); + assert_eq!(mode.to_lowercase(), "wal"); + + pool.close().await; + Ok(()) +} + +// ─── WITH CTE routing ────────────────────────────────────────────────────────── + +#[sqlx_macros::test] +async fn rw_pool_with_cte_routing() -> anyhow::Result<()> { + let (opts, _dir) = temp_db_opts(); + let pool = SqliteRwPool::connect_with(opts).await?; + + sqlx::query("CREATE TABLE cte_test (id INTEGER PRIMARY KEY, val INTEGER)") + .execute(&pool) + .await?; + sqlx::query("INSERT INTO cte_test (val) VALUES (10), (20), (30)") + .execute(&pool) + .await?; + + // Read-only WITH CTE → reader + let rows = + sqlx::query("WITH t AS (SELECT val FROM cte_test WHERE val > 10) SELECT * FROM t") + .fetch_all(&pool) + .await?; + assert_eq!(rows.len(), 2); + + pool.close().await; + Ok(()) +} + +// ─── Auto-route disabled ─────────────────────────────────────────────────────── + +#[sqlx_macros::test] +async fn rw_pool_auto_route_disabled() -> anyhow::Result<()> { + let (opts, _dir) = temp_db_opts(); + let pool = SqliteRwPoolOptions::new() + .auto_route(false) + .max_readers(2) + .connect_with(opts) + .await?; + + // Even SELECTs go to writer when auto_route is disabled + sqlx::query("CREATE TABLE no_route (id INTEGER PRIMARY KEY)") + .execute(&pool) + .await?; + + sqlx::query("INSERT INTO no_route (id) VALUES (1)") + .execute(&pool) + .await?; + + let row = sqlx::query("SELECT id FROM no_route") + .fetch_one(&pool) + .await?; + assert_eq!(row.get::("id"), 1); + + // Explicit reader still works + let mut reader = pool.acquire_reader().await?; + let rows = sqlx::query("SELECT id FROM no_route") + .fetch_all(&mut *reader) + .await?; + assert_eq!(rows.len(), 1); + + drop(reader); + pool.close().await; + Ok(()) +} + +// ─── Concurrent reads don't block ────────────────────────────────────────────── + +#[sqlx_macros::test] +async fn rw_pool_concurrent_reads() -> anyhow::Result<()> { + let (opts, _dir) = temp_db_opts(); + let pool = SqliteRwPoolOptions::new() + .max_readers(4) + .connect_with(opts) + .await?; + + sqlx::query("CREATE TABLE conc (id INTEGER PRIMARY KEY, val TEXT)") + .execute(&pool) + .await?; + + for i in 0..10 { + sqlx::query("INSERT INTO conc (val) VALUES (?)") + .bind(format!("item_{i}")) + .execute(&pool) + .await?; + } + + // Spawn multiple concurrent reads + let mut handles = Vec::new(); + for _ in 0..4 { + let pool = pool.clone(); + handles.push(tokio::spawn(async move { + let rows = sqlx::query("SELECT count(*) as cnt FROM conc") + .fetch_one(&pool) + .await + .unwrap(); + rows.get::("cnt") + })); + } + + for handle in handles { + let count = handle.await?; + assert_eq!(count, 10); + } + + pool.close().await; + Ok(()) +} + +// ─── Pool size introspection ─────────────────────────────────────────────────── + +#[sqlx_macros::test] +async fn rw_pool_size_introspection() -> anyhow::Result<()> { + let (opts, _dir) = temp_db_opts(); + let pool = SqliteRwPoolOptions::new() + .max_readers(3) + .connect_with(opts) + .await?; + + // Writer pool has exactly 1 connection + assert_eq!(pool.num_writers(), 1); + + // Reader pool may have 1 initially (pools lazily create connections) + assert!(pool.num_readers() >= 1); + + pool.close().await; + Ok(()) +} From 5aaae712835a4b2e8d068be6a03b76c6dc2171fd Mon Sep 17 00:00:00 2001 From: Evan Schwartz <3262610+emschwartz@users.noreply.github.com> Date: Mon, 23 Feb 2026 10:43:32 -0500 Subject: [PATCH 2/6] cargo fmt --- sqlx-sqlite/src/rw_pool.rs | 38 ++++++++++++++++++++------------------ tests/sqlite/rw_pool.rs | 7 +++---- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/sqlx-sqlite/src/rw_pool.rs b/sqlx-sqlite/src/rw_pool.rs index 93ac855cac..6b63f2c50e 100644 --- a/sqlx-sqlite/src/rw_pool.rs +++ b/sqlx-sqlite/src/rw_pool.rs @@ -1,5 +1,7 @@ use crate::options::{SqliteJournalMode, SqliteSynchronous}; -use crate::{Sqlite, SqliteConnectOptions, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo}; +use crate::{ + Sqlite, SqliteConnectOptions, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo, +}; use sqlx_core::acquire::Acquire; use sqlx_core::error::{BoxDynError, Error}; @@ -388,11 +390,7 @@ impl SqliteRwPool { if self.checkpoint_on_close && !self.write_pool.is_closed() { if let Ok(mut conn) = self.write_pool.acquire().await { // Best-effort WAL checkpoint - let _ = Executor::execute( - &mut *conn, - "PRAGMA wal_checkpoint(PASSIVE)", - ) - .await; + let _ = Executor::execute(&mut *conn, "PRAGMA wal_checkpoint(PASSIVE)").await; } } @@ -458,9 +456,7 @@ impl Execute<'_, Sqlite> for RoutedQuery { None } - fn take_arguments( - &mut self, - ) -> Result, BoxDynError> { + fn take_arguments(&mut self) -> Result, BoxDynError> { Ok(self.arguments.take()) } @@ -522,10 +518,18 @@ impl<'p> Executor<'p> for &SqliteRwPool { let sql = query.sql(); let use_reader = pool.auto_route && is_read_only_sql(sql.as_str()); - let target_pool = if use_reader { &pool.read_pool } else { &pool.write_pool }; + let target_pool = if use_reader { + &pool.read_pool + } else { + &pool.write_pool + }; let mut conn = target_pool.acquire().await?; - let routed = RoutedQuery { sql, arguments, persistent }; + let routed = RoutedQuery { + sql, + arguments, + persistent, + }; conn.fetch_optional(routed).await }) } @@ -598,7 +602,9 @@ mod tests { assert!(is_read_only_sql("Select 1")); assert!(is_read_only_sql(" SELECT 1")); assert!(is_read_only_sql("\n\tSELECT 1")); - assert!(is_read_only_sql("SELECT count(*) FROM orders WHERE status = 'active'")); + assert!(is_read_only_sql( + "SELECT count(*) FROM orders WHERE status = 'active'" + )); } // EXPLAIN @@ -626,9 +632,7 @@ mod tests { #[test] fn with_cte_routing() { // Read-only CTEs - assert!(is_read_only_sql( - "WITH t AS (SELECT 1) SELECT * FROM t" - )); + assert!(is_read_only_sql("WITH t AS (SELECT 1) SELECT * FROM t")); assert!(is_read_only_sql( "WITH RECURSIVE cte(n) AS (SELECT 1 UNION ALL SELECT n+1 FROM cte WHERE n < 10) SELECT * FROM cte" )); @@ -640,9 +644,7 @@ mod tests { assert!(!is_read_only_sql( "WITH t AS (SELECT 1) UPDATE foo SET bar = 1" )); - assert!(!is_read_only_sql( - "WITH t AS (SELECT 1) DELETE FROM foo" - )); + assert!(!is_read_only_sql("WITH t AS (SELECT 1) DELETE FROM foo")); assert!(!is_read_only_sql( "WITH t AS (SELECT 1) REPLACE INTO foo SELECT * FROM t" )); diff --git a/tests/sqlite/rw_pool.rs b/tests/sqlite/rw_pool.rs index 25e50a5bb8..016718a58a 100644 --- a/tests/sqlite/rw_pool.rs +++ b/tests/sqlite/rw_pool.rs @@ -230,10 +230,9 @@ async fn rw_pool_with_cte_routing() -> anyhow::Result<()> { .await?; // Read-only WITH CTE → reader - let rows = - sqlx::query("WITH t AS (SELECT val FROM cte_test WHERE val > 10) SELECT * FROM t") - .fetch_all(&pool) - .await?; + let rows = sqlx::query("WITH t AS (SELECT val FROM cte_test WHERE val > 10) SELECT * FROM t") + .fetch_all(&pool) + .await?; assert_eq!(rows.len(), 2); pool.close().await; From efcfaa052c81f74f96d7ffb2142da6a6a984dabb Mon Sep 17 00:00:00 2001 From: Evan Schwartz <3262610+emschwartz@users.noreply.github.com> Date: Mon, 23 Feb 2026 10:49:40 -0500 Subject: [PATCH 3/6] fix: resolve clippy warnings in SqliteRwPool Use try_from instead of truncating cast, unwrap_or instead of unnecessary closure, and unwrap_or_default for default construction. Co-Authored-By: Claude Opus 4.6 --- sqlx-sqlite/src/rw_pool.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sqlx-sqlite/src/rw_pool.rs b/sqlx-sqlite/src/rw_pool.rs index 6b63f2c50e..d40a11d803 100644 --- a/sqlx-sqlite/src/rw_pool.rs +++ b/sqlx-sqlite/src/rw_pool.rs @@ -202,7 +202,7 @@ impl SqliteRwPoolOptions { base_options: SqliteConnectOptions, ) -> Result { let num_cpus = std::thread::available_parallelism() - .map(|n| n.get() as u32) + .map(|n| u32::try_from(n.get()).unwrap_or(u32::MAX)) .unwrap_or(4); // Configure writer: WAL mode + synchronous(Normal) @@ -219,20 +219,20 @@ impl SqliteRwPoolOptions { // WAL mode is active on the database file; readers inherit it automatically. let reader_opts = self .reader_connect_options - .unwrap_or_else(|| base_options) + .unwrap_or(base_options) .read_only(true); // Writer pool: always exactly 1 connection let writer_pool_opts = self .writer_pool_options - .unwrap_or_else(PoolOptions::new) + .unwrap_or_default() .max_connections(1); // Reader pool: configurable, defaults to num_cpus let max_readers = self.max_readers.unwrap_or(num_cpus); let reader_pool_opts = self .reader_pool_options - .unwrap_or_else(PoolOptions::new) + .unwrap_or_default() .max_connections(max_readers); // Create writer pool FIRST — establishes WAL mode on the database file From fc46b24b6637d3b3d9f3ecdafdda591fd34f81a0 Mon Sep 17 00:00:00 2001 From: Evan Schwartz <3262610+emschwartz@users.noreply.github.com> Date: Mon, 23 Feb 2026 11:01:25 -0500 Subject: [PATCH 4/6] fix: use runtime-agnostic spawn in rw_pool concurrent reads test Replace tokio::spawn with sqlx_core::rt::spawn so the test works with any async runtime (e.g. async-global-executor), not just Tokio. Co-Authored-By: Claude Opus 4.6 --- tests/sqlite/rw_pool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/sqlite/rw_pool.rs b/tests/sqlite/rw_pool.rs index 016718a58a..d1b320a823 100644 --- a/tests/sqlite/rw_pool.rs +++ b/tests/sqlite/rw_pool.rs @@ -301,7 +301,7 @@ async fn rw_pool_concurrent_reads() -> anyhow::Result<()> { let mut handles = Vec::new(); for _ in 0..4 { let pool = pool.clone(); - handles.push(tokio::spawn(async move { + handles.push(sqlx_core::rt::spawn(async move { let rows = sqlx::query("SELECT count(*) as cnt FROM conc") .fetch_one(&pool) .await @@ -311,7 +311,7 @@ async fn rw_pool_concurrent_reads() -> anyhow::Result<()> { } for handle in handles { - let count = handle.await?; + let count = handle.await; assert_eq!(count, 10); } From 704c1a4168ae646d15dbd3b6f280f0d523d8a171 Mon Sep 17 00:00:00 2001 From: Evan Schwartz <3262610+emschwartz@users.noreply.github.com> Date: Mon, 23 Feb 2026 12:34:45 -0500 Subject: [PATCH 5/6] refactor: remove auto-routing from SqliteRwPool Users must now explicitly call .reader() or .writer() to route queries. The Executor impl on &SqliteRwPool always delegates to the writer pool. This removes the SQL classification heuristics and simplifies the Executor impl to a direct delegation. Co-Authored-By: Claude Opus 4.6 --- sqlx-sqlite/src/rw_pool.rs | 306 +++---------------------------------- tests/sqlite/rw_pool.rs | 123 +-------------- 2 files changed, 23 insertions(+), 406 deletions(-) diff --git a/sqlx-sqlite/src/rw_pool.rs b/sqlx-sqlite/src/rw_pool.rs index d40a11d803..61a996caca 100644 --- a/sqlx-sqlite/src/rw_pool.rs +++ b/sqlx-sqlite/src/rw_pool.rs @@ -4,7 +4,7 @@ use crate::{ }; use sqlx_core::acquire::Acquire; -use sqlx_core::error::{BoxDynError, Error}; +use sqlx_core::error::Error; use sqlx_core::executor::{Execute, Executor}; use sqlx_core::pool::{MaybePoolConnection, Pool, PoolConnection, PoolOptions}; use sqlx_core::sql_str::SqlStr; @@ -13,61 +13,9 @@ use sqlx_core::Either; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; -use futures_util::TryStreamExt; use std::fmt; -// ─── SQL Classification ──────────────────────────────────────────────────────── - -/// Split `sql` into "words" (maximal runs of alphanumeric/underscore characters). -fn sql_words(sql: &str) -> impl Iterator { - sql.split(|c: char| !c.is_ascii_alphanumeric() && c != '_') - .filter(|w| !w.is_empty()) -} - -/// Check if any word in `sql` matches `keyword` (case-insensitive). -fn has_keyword(sql: &str, keyword: &str) -> bool { - sql_words(sql).any(|w| w.eq_ignore_ascii_case(keyword)) -} - -/// Determines whether a SQL statement should be routed to the read pool. -/// -/// Uses conservative heuristics — only definitively read-only statements -/// return `true`. Everything ambiguous routes to the writer (safe default). -/// -/// ## Routing rules -/// -/// - `SELECT` / `EXPLAIN` → always reader (cannot write in SQLite) -/// - `PRAGMA` without `=` → reader (read-only PRAGMA) -/// - `WITH` CTE → reader **only if** no write keywords appear anywhere -/// - Everything else → writer -#[inline] -pub(crate) fn is_read_only_sql(sql: &str) -> bool { - let first = match sql_words(sql).next() { - Some(w) => w, - None => return false, - }; - - if first.eq_ignore_ascii_case("SELECT") || first.eq_ignore_ascii_case("EXPLAIN") { - return true; - } - - if first.eq_ignore_ascii_case("PRAGMA") && !sql.contains('=') { - return true; - } - - // WITH CTEs: read-only if no write keywords appear anywhere. - // Safe false negatives: write keywords in string literals → routes to writer. - if first.eq_ignore_ascii_case("WITH") { - return !has_keyword(sql, "INSERT") - && !has_keyword(sql, "UPDATE") - && !has_keyword(sql, "DELETE") - && !has_keyword(sql, "REPLACE"); - } - - false -} - // ─── SqliteRwPoolOptions ─────────────────────────────────────────────────────── /// Builder for [`SqliteRwPool`]. @@ -98,7 +46,6 @@ pub struct SqliteRwPoolOptions { writer_connect_options: Option, reader_pool_options: Option>, writer_pool_options: Option>, - auto_route: bool, checkpoint_on_close: bool, } @@ -113,7 +60,6 @@ impl SqliteRwPoolOptions { /// /// Defaults: /// - `max_readers`: number of available CPUs (or 4 if unavailable) - /// - `auto_route`: `true` /// - `checkpoint_on_close`: `true` pub fn new() -> Self { Self { @@ -122,7 +68,6 @@ impl SqliteRwPoolOptions { writer_connect_options: None, reader_pool_options: None, writer_pool_options: None, - auto_route: true, checkpoint_on_close: true, } } @@ -168,16 +113,6 @@ impl SqliteRwPoolOptions { self } - /// Enable or disable automatic SQL-based routing. - /// - /// When enabled (the default), the [`Executor`] impl inspects each query's SQL - /// to decide whether to use the reader or writer pool. When disabled, all - /// queries go to the writer; readers are only used via [`SqliteRwPool::reader()`]. - pub fn auto_route(mut self, auto_route: bool) -> Self { - self.auto_route = auto_route; - self - } - /// Run `PRAGMA wal_checkpoint(PASSIVE)` on close. /// /// Enabled by default. This flushes as much WAL data as possible to the @@ -244,7 +179,6 @@ impl SqliteRwPoolOptions { Ok(SqliteRwPool { read_pool, write_pool, - auto_route: self.auto_route, checkpoint_on_close: self.checkpoint_on_close, }) } @@ -261,14 +195,9 @@ impl SqliteRwPoolOptions { /// - A **writer pool** with a single connection for all write operations /// - A **reader pool** with multiple read-only connections for queries /// -/// # Auto-Routing -/// -/// When enabled (the default), the [`Executor`] impl inspects each query's SQL -/// and routes it to the appropriate pool: -/// -/// - `SELECT`, `EXPLAIN`, read-only `PRAGMA` → reader pool -/// - `WITH` CTEs without write keywords → reader pool -/// - Everything else → writer pool +/// Use [`reader()`](SqliteRwPool::reader) and [`writer()`](SqliteRwPool::writer) +/// to explicitly route queries to the appropriate pool. The [`Acquire`] trait +/// and the [`Executor`] impl always use the writer pool as a safe default. /// /// # WAL Mode /// @@ -292,14 +221,14 @@ impl SqliteRwPoolOptions { /// /// let pool = SqliteRwPool::connect("sqlite://data.db").await?; /// -/// // SELECT → automatically routed to reader pool +/// // Reads go through the reader pool /// let rows = sqlx::query("SELECT * FROM users") -/// .fetch_all(&pool).await?; +/// .fetch_all(pool.reader()).await?; /// -/// // INSERT → automatically routed to writer pool +/// // Writes go through the writer pool /// sqlx::query("INSERT INTO users (name) VALUES (?)") /// .bind("Alice") -/// .execute(&pool).await?; +/// .execute(pool.writer()).await?; /// /// pool.close().await; /// # Ok(()) @@ -309,7 +238,6 @@ impl SqliteRwPoolOptions { pub struct SqliteRwPool { read_pool: Pool, write_pool: Pool, - auto_route: bool, checkpoint_on_close: bool, } @@ -330,7 +258,8 @@ impl SqliteRwPool { /// Get a reference to the underlying reader pool. /// - /// Useful for explicitly routing queries to readers, bypassing auto-routing. + /// Use this to explicitly route read queries to the reader pool for + /// concurrent read access. /// /// # Note /// @@ -428,7 +357,6 @@ impl fmt::Debug for SqliteRwPool { f.debug_struct("SqliteRwPool") .field("read_pool", &self.read_pool) .field("write_pool", &self.write_pool) - .field("auto_route", &self.auto_route) .field("checkpoint_on_close", &self.checkpoint_on_close) .finish() } @@ -436,41 +364,14 @@ impl fmt::Debug for SqliteRwPool { // ─── Executor impl ───────────────────────────────────────────────────────────── -/// Carries decomposed query parts for re-delegation to a connection's [`Executor`] impl. -/// -/// This allows the `SqliteRwPool` to inspect the SQL for routing before delegating -/// execution to the appropriate pool's connection, without coupling to -/// `SqliteConnection` internals. -struct RoutedQuery { - sql: SqlStr, - arguments: Option, - persistent: bool, -} - -impl Execute<'_, Sqlite> for RoutedQuery { - fn sql(self) -> SqlStr { - self.sql - } - - fn statement(&self) -> Option<&SqliteStatement> { - None - } - - fn take_arguments(&mut self) -> Result, BoxDynError> { - Ok(self.arguments.take()) - } - - fn persistent(&self) -> bool { - self.persistent - } -} - +/// All queries executed directly on `&SqliteRwPool` go to the writer pool. +/// Use [`SqliteRwPool::reader()`] to explicitly route reads to the reader pool. impl<'p> Executor<'p> for &SqliteRwPool { type Database = Sqlite; fn fetch_many<'e, 'q, E>( self, - mut query: E, + query: E, ) -> BoxStream<'e, Result, Error>> where 'p: 'e, @@ -478,31 +379,12 @@ impl<'p> Executor<'p> for &SqliteRwPool { 'q: 'e, E: 'q, { - let pool = self.clone(); - - Box::pin(try_stream! { - let arguments = query.take_arguments().map_err(Error::Encode)?; - let persistent = query.persistent(); - let sql = query.sql(); - - let use_reader = pool.auto_route && is_read_only_sql(sql.as_str()); - let target_pool = if use_reader { &pool.read_pool } else { &pool.write_pool }; - let mut conn = target_pool.acquire().await?; - - let routed = RoutedQuery { sql, arguments, persistent }; - let mut s = conn.fetch_many(routed); - - while let Some(v) = s.try_next().await? { - r#yield!(v); - } - - Ok(()) - }) + (&self.write_pool).fetch_many(query) } fn fetch_optional<'e, 'q, E>( self, - mut query: E, + query: E, ) -> BoxFuture<'e, Result, Error>> where 'p: 'e, @@ -510,28 +392,7 @@ impl<'p> Executor<'p> for &SqliteRwPool { 'q: 'e, E: 'q, { - let pool = self.clone(); - - Box::pin(async move { - let arguments = query.take_arguments().map_err(Error::Encode)?; - let persistent = query.persistent(); - let sql = query.sql(); - - let use_reader = pool.auto_route && is_read_only_sql(sql.as_str()); - let target_pool = if use_reader { - &pool.read_pool - } else { - &pool.write_pool - }; - let mut conn = target_pool.acquire().await?; - - let routed = RoutedQuery { - sql, - arguments, - persistent, - }; - conn.fetch_optional(routed).await - }) + (&self.write_pool).fetch_optional(query) } fn prepare_with<'e>( @@ -542,9 +403,7 @@ impl<'p> Executor<'p> for &SqliteRwPool { where 'p: 'e, { - let pool = self.write_pool.clone(); - - Box::pin(async move { pool.acquire().await?.prepare_with(sql, parameters).await }) + (&self.write_pool).prepare_with(sql, parameters) } #[doc(hidden)] @@ -556,9 +415,7 @@ impl<'p> Executor<'p> for &SqliteRwPool { where 'p: 'e, { - let pool = self.write_pool.clone(); - - Box::pin(async move { pool.acquire().await?.describe(sql).await }) + (&self.write_pool).describe(sql) } } @@ -588,130 +445,3 @@ impl<'a> Acquire<'a> for &SqliteRwPool { } } -// ─── Tests ───────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - - // SELECT variants - #[test] - fn select_is_read_only() { - assert!(is_read_only_sql("SELECT * FROM users")); - assert!(is_read_only_sql("select * from users")); - assert!(is_read_only_sql("Select 1")); - assert!(is_read_only_sql(" SELECT 1")); - assert!(is_read_only_sql("\n\tSELECT 1")); - assert!(is_read_only_sql( - "SELECT count(*) FROM orders WHERE status = 'active'" - )); - } - - // EXPLAIN - #[test] - fn explain_is_read_only() { - assert!(is_read_only_sql("EXPLAIN SELECT 1")); - assert!(is_read_only_sql("EXPLAIN QUERY PLAN SELECT * FROM users")); - assert!(is_read_only_sql("explain query plan select 1")); - } - - // PRAGMA - #[test] - fn pragma_routing() { - // Read-only PRAGMAs (no =) - assert!(is_read_only_sql("PRAGMA journal_mode")); - assert!(is_read_only_sql("PRAGMA table_info(users)")); - assert!(is_read_only_sql("pragma page_count")); - - // Write PRAGMAs (with =) - assert!(!is_read_only_sql("PRAGMA journal_mode = WAL")); - assert!(!is_read_only_sql("PRAGMA synchronous = NORMAL")); - } - - // WITH CTEs - #[test] - fn with_cte_routing() { - // Read-only CTEs - assert!(is_read_only_sql("WITH t AS (SELECT 1) SELECT * FROM t")); - assert!(is_read_only_sql( - "WITH RECURSIVE cte(n) AS (SELECT 1 UNION ALL SELECT n+1 FROM cte WHERE n < 10) SELECT * FROM cte" - )); - - // Write CTEs - assert!(!is_read_only_sql( - "WITH t AS (SELECT 1) INSERT INTO foo SELECT * FROM t" - )); - assert!(!is_read_only_sql( - "WITH t AS (SELECT 1) UPDATE foo SET bar = 1" - )); - assert!(!is_read_only_sql("WITH t AS (SELECT 1) DELETE FROM foo")); - assert!(!is_read_only_sql( - "WITH t AS (SELECT 1) REPLACE INTO foo SELECT * FROM t" - )); - } - - // Write operations → writer - #[test] - fn write_operations_are_not_read_only() { - assert!(!is_read_only_sql("INSERT INTO users VALUES (1)")); - assert!(!is_read_only_sql("UPDATE users SET name = 'Bob'")); - assert!(!is_read_only_sql("DELETE FROM users")); - assert!(!is_read_only_sql("REPLACE INTO users VALUES (1)")); - assert!(!is_read_only_sql("CREATE TABLE foo (id INT)")); - assert!(!is_read_only_sql("DROP TABLE foo")); - assert!(!is_read_only_sql("ALTER TABLE foo ADD COLUMN bar INT")); - assert!(!is_read_only_sql("CREATE INDEX idx ON foo(bar)")); - } - - // Transaction control → writer - #[test] - fn transaction_control_is_not_read_only() { - assert!(!is_read_only_sql("BEGIN")); - assert!(!is_read_only_sql("BEGIN TRANSACTION")); - assert!(!is_read_only_sql("COMMIT")); - assert!(!is_read_only_sql("ROLLBACK")); - assert!(!is_read_only_sql("SAVEPOINT sp1")); - } - - // Edge cases - #[test] - fn edge_cases() { - // Empty / whitespace - assert!(!is_read_only_sql("")); - assert!(!is_read_only_sql(" ")); - - // Keywords as substrings should NOT match - assert!(!is_read_only_sql("SELECTIVITY_CHECK()")); - - // ATTACH / DETACH → writer - assert!(!is_read_only_sql("ATTACH DATABASE ':memory:' AS db2")); - assert!(!is_read_only_sql("DETACH DATABASE db2")); - - // False negative: write keyword in string literal → safely routes to writer - assert!(!is_read_only_sql( - "WITH t AS (SELECT 'DELETE') SELECT * FROM t" - )); - } - - // Word boundary checks - #[test] - fn word_boundary_checks() { - // "SELECTED" should not match "SELECT" - assert!(!is_read_only_sql("SELECTED * FROM foo")); - - // "SELECTS" should not match "SELECT" - assert!(!is_read_only_sql("SELECTS something")); - - // "INSERTS" in a WITH should not trigger the INSERT guard - // (but it also won't match as a standalone keyword) - assert!(is_read_only_sql( - "WITH t AS (SELECT * FROM inserts_log) SELECT * FROM t" - )); - - // "UPDATE" as a column name in a table won't match because - // it would need word boundaries - assert!(is_read_only_sql( - "WITH t AS (SELECT updated FROM foo) SELECT * FROM t" - )); - } -} diff --git a/tests/sqlite/rw_pool.rs b/tests/sqlite/rw_pool.rs index d1b320a823..36c9b56a66 100644 --- a/tests/sqlite/rw_pool.rs +++ b/tests/sqlite/rw_pool.rs @@ -28,42 +28,6 @@ async fn rw_pool_connect_and_close() -> anyhow::Result<()> { Ok(()) } -// ─── Auto-routing: SELECT goes to reader, INSERT goes to writer ──────────────── - -#[sqlx_macros::test] -async fn rw_pool_auto_routes_queries() -> anyhow::Result<()> { - let (opts, _dir) = temp_db_opts(); - let pool = SqliteRwPool::connect_with(opts).await?; - - // Create table via writer (auto-routed) - sqlx::query("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL)") - .execute(&pool) - .await?; - - // INSERT via writer (auto-routed) - sqlx::query("INSERT INTO users (name) VALUES (?)") - .bind("Alice") - .execute(&pool) - .await?; - - sqlx::query("INSERT INTO users (name) VALUES (?)") - .bind("Bob") - .execute(&pool) - .await?; - - // SELECT via reader (auto-routed) - let rows = sqlx::query("SELECT name FROM users ORDER BY name") - .fetch_all(&pool) - .await?; - - assert_eq!(rows.len(), 2); - assert_eq!(rows[0].get::("name"), "Alice"); - assert_eq!(rows[1].get::("name"), "Bob"); - - pool.close().await; - Ok(()) -} - // ─── Transactions always use writer ──────────────────────────────────────────── #[sqlx_macros::test] @@ -199,83 +163,6 @@ async fn rw_pool_reader_rejects_writes() -> anyhow::Result<()> { Ok(()) } -// ─── PRAGMA routing ──────────────────────────────────────────────────────────── - -#[sqlx_macros::test] -async fn rw_pool_pragma_routing() -> anyhow::Result<()> { - let (opts, _dir) = temp_db_opts(); - let pool = SqliteRwPool::connect_with(opts).await?; - - // Read-only PRAGMA (no =) should succeed (routes to reader) - let row = sqlx::query("PRAGMA journal_mode").fetch_one(&pool).await?; - let mode = row.get::(0); - assert_eq!(mode.to_lowercase(), "wal"); - - pool.close().await; - Ok(()) -} - -// ─── WITH CTE routing ────────────────────────────────────────────────────────── - -#[sqlx_macros::test] -async fn rw_pool_with_cte_routing() -> anyhow::Result<()> { - let (opts, _dir) = temp_db_opts(); - let pool = SqliteRwPool::connect_with(opts).await?; - - sqlx::query("CREATE TABLE cte_test (id INTEGER PRIMARY KEY, val INTEGER)") - .execute(&pool) - .await?; - sqlx::query("INSERT INTO cte_test (val) VALUES (10), (20), (30)") - .execute(&pool) - .await?; - - // Read-only WITH CTE → reader - let rows = sqlx::query("WITH t AS (SELECT val FROM cte_test WHERE val > 10) SELECT * FROM t") - .fetch_all(&pool) - .await?; - assert_eq!(rows.len(), 2); - - pool.close().await; - Ok(()) -} - -// ─── Auto-route disabled ─────────────────────────────────────────────────────── - -#[sqlx_macros::test] -async fn rw_pool_auto_route_disabled() -> anyhow::Result<()> { - let (opts, _dir) = temp_db_opts(); - let pool = SqliteRwPoolOptions::new() - .auto_route(false) - .max_readers(2) - .connect_with(opts) - .await?; - - // Even SELECTs go to writer when auto_route is disabled - sqlx::query("CREATE TABLE no_route (id INTEGER PRIMARY KEY)") - .execute(&pool) - .await?; - - sqlx::query("INSERT INTO no_route (id) VALUES (1)") - .execute(&pool) - .await?; - - let row = sqlx::query("SELECT id FROM no_route") - .fetch_one(&pool) - .await?; - assert_eq!(row.get::("id"), 1); - - // Explicit reader still works - let mut reader = pool.acquire_reader().await?; - let rows = sqlx::query("SELECT id FROM no_route") - .fetch_all(&mut *reader) - .await?; - assert_eq!(rows.len(), 1); - - drop(reader); - pool.close().await; - Ok(()) -} - // ─── Concurrent reads don't block ────────────────────────────────────────────── #[sqlx_macros::test] @@ -287,23 +174,23 @@ async fn rw_pool_concurrent_reads() -> anyhow::Result<()> { .await?; sqlx::query("CREATE TABLE conc (id INTEGER PRIMARY KEY, val TEXT)") - .execute(&pool) + .execute(pool.writer()) .await?; for i in 0..10 { sqlx::query("INSERT INTO conc (val) VALUES (?)") .bind(format!("item_{i}")) - .execute(&pool) + .execute(pool.writer()) .await?; } - // Spawn multiple concurrent reads + // Spawn multiple concurrent reads via explicit reader pool let mut handles = Vec::new(); for _ in 0..4 { - let pool = pool.clone(); + let reader = pool.reader().clone(); handles.push(sqlx_core::rt::spawn(async move { let rows = sqlx::query("SELECT count(*) as cnt FROM conc") - .fetch_one(&pool) + .fetch_one(&reader) .await .unwrap(); rows.get::("cnt") From 0bd54c60e72da7ba287cc6589b050e4fefda036a Mon Sep 17 00:00:00 2001 From: Evan Schwartz <3262610+emschwartz@users.noreply.github.com> Date: Mon, 23 Feb 2026 12:36:02 -0500 Subject: [PATCH 6/6] cargo fmt --- sqlx-sqlite/src/rw_pool.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sqlx-sqlite/src/rw_pool.rs b/sqlx-sqlite/src/rw_pool.rs index 61a996caca..3d60ae0412 100644 --- a/sqlx-sqlite/src/rw_pool.rs +++ b/sqlx-sqlite/src/rw_pool.rs @@ -382,10 +382,7 @@ impl<'p> Executor<'p> for &SqliteRwPool { (&self.write_pool).fetch_many(query) } - fn fetch_optional<'e, 'q, E>( - self, - query: E, - ) -> BoxFuture<'e, Result, Error>> + fn fetch_optional<'e, 'q, E>(self, query: E) -> BoxFuture<'e, Result, Error>> where 'p: 'e, E: Execute<'q, Sqlite>, @@ -444,4 +441,3 @@ impl<'a> Acquire<'a> for &SqliteRwPool { }) } } -