Skip to content
72 changes: 72 additions & 0 deletions docs/examples/patterns/migrations_with_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from pathlib import Path

__all__ = ("test_migrations_with_schema",)


def test_migrations_with_schema(tmp_path: Path) -> None:
# start-example
from sqlspec.adapters.duckdb import DuckDBConfig
from sqlspec.migrations.commands import SyncMigrationCommands

migration_dir = tmp_path / "migrations"
db_path = tmp_path / "app.duckdb"

config = DuckDBConfig(
connection_config={"database": str(db_path)},
migration_config={
"script_location": str(migration_dir),
"version_table_name": "schema_versions",
"default_schema": "app_schema",
"version_table_schema": "admin_schema",
},
)

try:
with config.provide_session() as session:
session.execute("CREATE SCHEMA app_schema")
session.execute("CREATE SCHEMA admin_schema")

commands = SyncMigrationCommands(config)
commands.init(str(migration_dir), package=True)

(migration_dir / "0001_create_users.py").write_text(
'''"""Create users."""


def up():
"""Create an unqualified table in app_schema."""
return ["CREATE TABLE users (id INTEGER PRIMARY KEY, name VARCHAR NOT NULL)"]


def down():
"""Drop the unqualified table from app_schema."""
return ["DROP TABLE IF EXISTS users"]
'''
)

commands.upgrade()

with config.provide_session() as session:
users_table = session.select_value(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = ? AND table_name = ?
""",
("app_schema", "users"),
)
tracker_table = session.select_value(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = ? AND table_name = ?
""",
("admin_schema", "schema_versions"),
)

assert users_table == "users"
assert tracker_table == "schema_versions"
finally:
if config.connection_instance:
config.close_pool()
# end-example
59 changes: 59 additions & 0 deletions docs/usage/migrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,65 @@ specific extension, ``migration_config["include_extensions"]`` to opt in
explicitly by extension name, or ``migration_config["enabled"] = False`` to
disable migrations entirely for a database config.

Configuring a Default Schema
----------------------------

Use ``migration_config["default_schema"]`` when migration SQL should run
against a pre-existing schema without qualifying every table in each migration
file. SQLSpec validates the schema before creating the tracker table or applying
DDL, then configures the migration session before each migration is executed.

Use ``migration_config["version_table_schema"]`` when the migration tracker
table should live somewhere different from the objects managed by migrations.
If ``version_table_schema`` is not set, the tracker schema resolves to
``default_schema``. If neither field is set, the tracker table is unqualified and
uses the adapter's normal default namespace.

.. code-block:: python

from sqlspec.adapters.asyncpg import AsyncpgConfig

config = AsyncpgConfig(
connection_config={"dsn": "postgresql://localhost/app"},
migration_config={
"script_location": "migrations/postgres",
"version_table_name": "schema_versions",
"default_schema": "app_schema",
"version_table_schema": "admin_schema",
},
)

The operator must create the target schema before running migrations. The
migration role also needs the database-specific privileges to create objects
there. For PostgreSQL, that usually means ``USAGE`` and
``CREATE`` on the target schema, plus permission to create or update the
tracker table.

Adapter support:

.. list-table::
:header-rows: 1

* - Adapter
- Behavior
* - ``asyncpg``, ``psycopg``, ``psqlpy``, ADBC PostgreSQL
- Uses PostgreSQL ``search_path`` and validates ``information_schema.schemata``.
* - ``oracledb``
- Uses ``ALTER SESSION SET CURRENT_SCHEMA`` and validates Oracle users.
* - ``duckdb``
- Uses ``SET search_path`` and validates ``information_schema.schemata``.
* - ``sqlite``, ``aiosqlite``, ``asyncmy``
- Accept the setting as an explicit no-op and log that default schemas are unsupported.
* - ADBC SQL Server
- Accepts the setting as a no-op; configure the default schema at the user or login level.

Example with unqualified DDL:

.. literalinclude:: /examples/patterns/migrations_with_schema.py
:language: python
:start-after: # start-example
:end-before: # end-example

Logging and Echo Controls
-------------------------

Expand Down
5 changes: 4 additions & 1 deletion sqlspec/adapters/adbc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,11 @@ def __init__(
self._pgvector_available: bool | None = None
self._paradedb_available: bool | None = None

dialect = resolve_dialect_from_config(self.connection_config)
object.__setattr__(self, "supports_migration_schemas", is_postgres_dialect(dialect))

if statement_config is None:
statement_config = get_statement_config(resolve_dialect_from_config(self.connection_config))
statement_config = get_statement_config(dialect)

statement_config, driver_features = apply_driver_features(statement_config, driver_features)

Expand Down
47 changes: 47 additions & 0 deletions sqlspec/adapters/adbc/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from sqlspec.core import SQL, StatementConfig, build_arrow_result_from_table, get_cache_config, register_driver_profile
from sqlspec.driver import BaseSyncExceptionHandler, SyncDriverAdapterBase
from sqlspec.exceptions import DatabaseConnectionError, SQLSpecError
from sqlspec.migrations.utils import quote_migration_identifier
from sqlspec.utils.logging import get_logger
from sqlspec.utils.module_loader import ensure_pyarrow
from sqlspec.utils.serializers import to_json
Expand Down Expand Up @@ -282,6 +283,52 @@ def rollback(self) -> None:
msg = f"Failed to rollback transaction: {e}"
raise SQLSpecError(msg) from e

def set_migration_session_schema(self, schema: str) -> None:
"""Set the PostgreSQL search path for migration SQL when using ADBC PostgreSQL."""
if not self._is_postgres:
super().set_migration_session_schema(schema)
if self._dialect_name in {"mssql", "sqlserver", "tsql"}:
logger.debug(
"SQL Server schema support not yet implemented for ADBC; configure default schema at the "
"user/login level; ignoring default_schema=%r",
schema,
)
else:
logger.debug("%s driver does not support default schemas; ignoring default_schema=%r", "ADBC", schema)
return
quoted_schema = quote_migration_identifier(schema)
with self.with_cursor(self.connection) as cursor:
cursor.execute(f'SET search_path TO {quoted_schema}, "$user", public')

def set_migration_non_transactional_schema(self, schema: str) -> None:
"""Set the PostgreSQL search path for non-transactional migration SQL."""
self.set_migration_session_schema(schema)

def reset_migration_session_schema(self) -> None:
"""Reset PostgreSQL search path after non-transactional migration SQL."""
if not self._is_postgres:
super().reset_migration_session_schema()
return
with self.with_cursor(self.connection) as cursor:
cursor.execute("RESET search_path")

def has_schema(self, schema: str) -> bool:
"""Return whether a PostgreSQL schema exists when using ADBC PostgreSQL."""
if not self._is_postgres:
super().has_schema(schema)
if self._dialect_name in {"mssql", "sqlserver", "tsql"}:
logger.debug(
"SQL Server schema support not yet implemented for ADBC; configure default schema at the "
"user/login level; accepting default_schema=%r",
schema,
)
else:
logger.debug("%s driver does not support default schemas; accepting default_schema=%r", "ADBC", schema)
return True
with self.with_cursor(self.connection) as cursor:
cursor.execute("SELECT 1 FROM information_schema.schemata WHERE schema_name = $1", parameters=[schema])
return cursor.fetchone() is not None

def with_cursor(self, connection: "AdbcConnection") -> "AdbcCursor":
"""Create context manager for cursor.

Expand Down
14 changes: 14 additions & 0 deletions sqlspec/adapters/aiosqlite/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from sqlspec.core import ArrowResult, get_cache_config, register_driver_profile
from sqlspec.driver import AsyncDriverAdapterBase, BaseAsyncExceptionHandler
from sqlspec.exceptions import SQLSpecError
from sqlspec.utils.logging import get_logger

if TYPE_CHECKING:
from sqlspec.adapters.aiosqlite._typing import AiosqliteConnection
Expand All @@ -47,6 +48,8 @@
SQLITE_IOERR_CODE = 10
SQLITE_MISMATCH_CODE = 20

logger = get_logger(__name__)


class AiosqliteExceptionHandler(BaseAsyncExceptionHandler):
"""Async context manager for handling aiosqlite database exceptions.
Expand Down Expand Up @@ -181,6 +184,17 @@ async def rollback(self) -> None:
msg = f"Failed to rollback transaction: {e}"
raise SQLSpecError(msg) from e

async def set_migration_session_schema(self, schema: str) -> None:
"""Ignore migration default schema for aiosqlite."""
await super().set_migration_session_schema(schema)
logger.debug("%s driver does not support default schemas; ignoring default_schema=%r", "aiosqlite", schema)

async def has_schema(self, schema: str) -> bool:
"""Return True because SQLite has no separate schema namespace."""
await super().has_schema(schema)
logger.debug("%s driver does not support default schemas; accepting default_schema=%r", "aiosqlite", schema)
return True

def with_cursor(self, connection: "AiosqliteConnection") -> "AiosqliteCursor":
"""Create async context manager for AIOSQLite cursor."""
return AiosqliteCursor(connection)
Expand Down
11 changes: 11 additions & 0 deletions sqlspec/adapters/asyncmy/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@ async def rollback(self) -> None:
msg = f"Failed to rollback MySQL transaction: {e}"
raise SQLSpecError(msg) from e

async def set_migration_session_schema(self, schema: str) -> None:
"""Ignore migration default schema for asyncmy."""
await super().set_migration_session_schema(schema)
logger.debug("%s driver does not support default schemas; ignoring default_schema=%r", "asyncmy", schema)

async def has_schema(self, schema: str) -> bool:
"""Return True because asyncmy does not manage migration default schemas."""
await super().has_schema(schema)
logger.debug("%s driver does not support default schemas; accepting default_schema=%r", "asyncmy", schema)
return True

def with_cursor(self, connection: "AsyncmyConnection") -> "AsyncmyCursor":
"""Create cursor context manager for the connection.

Expand Down
1 change: 1 addition & 0 deletions sqlspec/adapters/asyncpg/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ class AsyncpgConfig(AsyncDatabaseConfig[AsyncpgConnection, "Pool[Record]", Async
driver_type: "ClassVar[type[AsyncpgDriver]]" = AsyncpgDriver
connection_type: "ClassVar[type[AsyncpgConnection]]" = type(AsyncpgConnection) # type: ignore[assignment]
supports_transactional_ddl: "ClassVar[bool]" = True
supports_migration_schemas: "ClassVar[bool]" = True
supports_native_arrow_export: "ClassVar[bool]" = True
supports_native_arrow_import: "ClassVar[bool]" = True
supports_native_parquet_export: "ClassVar[bool]" = True
Expand Down
21 changes: 21 additions & 0 deletions sqlspec/adapters/asyncpg/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
describe_stack_statement,
)
from sqlspec.exceptions import SQLSpecError, StackExecutionError
from sqlspec.migrations.utils import quote_migration_identifier
from sqlspec.utils.logging import get_logger
from sqlspec.utils.type_guards import has_sqlstate

Expand Down Expand Up @@ -228,6 +229,26 @@ async def rollback(self) -> None:
msg = f"Failed to rollback async transaction: {e}"
raise SQLSpecError(msg) from e

async def set_migration_session_schema(self, schema: str) -> None:
"""Set the PostgreSQL search path for migration SQL."""
quoted_schema = quote_migration_identifier(schema)
await self.connection.execute(f'SET LOCAL search_path TO {quoted_schema}, "$user", public')

async def set_migration_non_transactional_schema(self, schema: str) -> None:
"""Set the PostgreSQL search path for non-transactional migration SQL."""
quoted_schema = quote_migration_identifier(schema)
await self.connection.execute(f'SET search_path TO {quoted_schema}, "$user", public')

async def reset_migration_session_schema(self) -> None:
"""Reset the PostgreSQL search path after non-transactional migration SQL."""
await self.connection.execute("RESET search_path")

async def has_schema(self, schema: str) -> bool:
"""Return whether a PostgreSQL schema exists."""
return bool(
await self.connection.fetchval("SELECT 1 FROM information_schema.schemata WHERE schema_name = $1", schema)
)

def with_cursor(self, connection: "AsyncpgConnection") -> "AsyncpgCursor":
"""Create context manager for AsyncPG cursor."""
return AsyncpgCursor(connection)
Expand Down
1 change: 1 addition & 0 deletions sqlspec/adapters/duckdb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ class DuckDBConfig(SyncDatabaseConfig[DuckDBConnection, DuckDBConnectionPool, Du
driver_type: "ClassVar[type[DuckDBDriver]]" = DuckDBDriver
connection_type: "ClassVar[type[DuckDBConnection]]" = DuckDBConnection
supports_transactional_ddl: "ClassVar[bool]" = True
supports_migration_schemas: "ClassVar[bool]" = True
supports_native_arrow_export: "ClassVar[bool]" = True
supports_native_arrow_import: "ClassVar[bool]" = True
supports_native_parquet_export: "ClassVar[bool]" = True
Expand Down
16 changes: 16 additions & 0 deletions sqlspec/adapters/duckdb/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
_type_converter = DuckDBOutputConverter()


def _quote_duckdb_search_path(schema: str) -> str:
"""Return a DuckDB string literal for SET search_path."""
return "'" + schema.replace("'", "''") + "'"


class DuckDBExceptionHandler(BaseSyncExceptionHandler):
"""Context manager for handling DuckDB database exceptions.

Expand Down Expand Up @@ -210,6 +215,17 @@ def rollback(self) -> None:
msg = f"Failed to rollback DuckDB transaction: {e}"
raise SQLSpecError(msg) from e

def set_migration_session_schema(self, schema: str) -> None:
"""Set DuckDB search_path for migration SQL."""
self.connection.execute(f"SET search_path = {_quote_duckdb_search_path(schema)}")

def has_schema(self, schema: str) -> bool:
"""Return whether a DuckDB schema exists."""
result = self.connection.execute(
"SELECT 1 FROM information_schema.schemata WHERE schema_name = ?", [schema]
).fetchone()
return result is not None

def with_cursor(self, connection: "DuckDBConnection") -> "DuckDBCursor":
"""Create context manager for DuckDB cursor.

Expand Down
2 changes: 2 additions & 0 deletions sqlspec/adapters/oracledb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ class OracleSyncConfig(SyncDatabaseConfig[OracleSyncConnection, "OracleSyncConne
connection_type: "ClassVar[type[OracleSyncConnection]]" = OracleSyncConnection
migration_tracker_type: "ClassVar[type[OracleSyncMigrationTracker]]" = OracleSyncMigrationTracker
supports_transactional_ddl: ClassVar[bool] = False
supports_migration_schemas: ClassVar[bool] = True
supports_native_arrow_export: ClassVar[bool] = True
supports_native_arrow_import: ClassVar[bool] = True
supports_native_parquet_export: ClassVar[bool] = True
Expand Down Expand Up @@ -422,6 +423,7 @@ class OracleAsyncConfig(AsyncDatabaseConfig[OracleAsyncConnection, "OracleAsyncC
driver_type: ClassVar[type[OracleAsyncDriver]] = OracleAsyncDriver
migration_tracker_type: "ClassVar[type[OracleAsyncMigrationTracker]]" = OracleAsyncMigrationTracker
supports_transactional_ddl: ClassVar[bool] = False
supports_migration_schemas: ClassVar[bool] = True
supports_native_arrow_export: ClassVar[bool] = True
supports_native_arrow_import: ClassVar[bool] = True
supports_native_parquet_export: ClassVar[bool] = True
Expand Down
Loading
Loading