Skip to content

Commit 7368959

Browse files
authored
Fix Broken Reflection for 1.4 FutureEngine (sqlalchemy-redshift#277)
* fix 269 * Update CHANGES.rst
1 parent 90da72a commit 7368959

File tree

5 files changed

+98
-71
lines changed

5 files changed

+98
-71
lines changed

CHANGES.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
- Override new upstream postgres method that fails against redshift (`Pull #266 <https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/pull/266>`_)
55
- Fix table reflection broken for non-superusers
66
(`Pull #276 <https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/pull/276>`_)
7+
- Fix Broken Reflection for 1.4 FutureEngine
8+
(`Pull #277 <https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/pull/277>`_)
79

810

911
0.8.13 (2023-03-28)

sqlalchemy_redshift/dialect.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@ def get_check_constraints(self, connection, table_name, schema=None, **kw):
753753
)
754754
table_oid = 'NULL' if not table_oid else table_oid
755755

756-
result = connection.execute("""
756+
result = connection.execute(sa.text("""
757757
SELECT
758758
cons.conname as name,
759759
pg_get_constraintdef(cons.oid) as src
@@ -762,7 +762,7 @@ def get_check_constraints(self, connection, table_name, schema=None, **kw):
762762
WHERE
763763
cons.conrelid = {} AND
764764
cons.contype = 'c'
765-
""".format(table_oid))
765+
""".format(table_oid)))
766766
ret = []
767767
for name, src in result:
768768
# samples:
@@ -1122,15 +1122,14 @@ def _get_schema_column_info(self, connection, **kw):
11221122
)
11231123

11241124
all_columns = defaultdict(list)
1125-
with connection.connect() as cc:
1126-
result = cc.execute(sa.text(REFLECTION_SQL.format(
1127-
schema_clause=schema_clause,
1128-
table_clause=table_clause
1129-
)))
1130-
1131-
for col in result:
1132-
key = RelationKey(col.table_name, col.schema, connection)
1133-
all_columns[key].append(col)
1125+
result = connection.execute(sa.text(REFLECTION_SQL.format(
1126+
schema_clause=schema_clause,
1127+
table_clause=table_clause
1128+
)))
1129+
1130+
for col in result:
1131+
key = RelationKey(col.table_name, col.schema, connection)
1132+
all_columns[key].append(col)
11341133

11351134
return dict(all_columns)
11361135

tests/conftest.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -176,14 +176,23 @@ def migrate(self, engine):
176176
@contextlib.contextmanager
177177
def _database(self):
178178
from sqlalchemy_redshift.dialect import \
179-
RedshiftDialect_redshift_connector
179+
RedshiftDialect_psycopg2cffi
180180

181181
db_name = database_name()
182-
with self.engine.connect() as conn:
183-
conn.execute('COMMIT') # Can't create databases in a transaction
184-
if isinstance(conn.dialect, RedshiftDialect_redshift_connector):
185-
conn.execution_options(isolation_level="AUTOCOMMIT")
186-
conn.execute('CREATE DATABASE {db_name}'.format(db_name=db_name))
182+
opts = (
183+
{"isolation_level": "AUTOCOMMIT"}
184+
if not isinstance(
185+
self.engine.dialect, RedshiftDialect_psycopg2cffi
186+
)
187+
else {}
188+
)
189+
190+
with self.engine.connect().execution_options(**opts) as conn:
191+
if isinstance(self.engine.dialect, RedshiftDialect_psycopg2cffi):
192+
conn.execute(sa.text("COMMIT"))
193+
conn.execute(
194+
sa.text('CREATE DATABASE {db_name}'.format(db_name=db_name))
195+
)
187196

188197
dburl = copy.deepcopy(self.engine.url)
189198
try:
@@ -197,13 +206,14 @@ def _database(self):
197206
connect_args=self.engine_definition.connect_args,
198207
)
199208
finally:
200-
with self.engine.connect() as conn:
201-
conn.execute('COMMIT') # Can't drop databases in a transaction
209+
with self.engine.connect().execution_options(**opts) as conn:
202210
if isinstance(
203-
conn.dialect, RedshiftDialect_redshift_connector
211+
self.engine.dialect, RedshiftDialect_psycopg2cffi
204212
):
205-
conn.execution_options(isolation_level="AUTOCOMMIT")
206-
conn.execute('DROP DATABASE {db_name}'.format(db_name=db_name))
213+
conn.execute(sa.text("COMMIT"))
214+
conn.execute(
215+
sa.text('DROP DATABASE {db_name}'.format(db_name=db_name))
216+
)
207217

208218
@contextlib.contextmanager
209219
def migrated_database(self):

tests/test_reflection.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import pytest
22
from sqlalchemy import MetaData, Table, inspect
3-
from sqlalchemy.dialects.postgresql.psycopg2cffi import PGDialect_psycopg2cffi
43
from sqlalchemy.schema import CreateTable
54
from sqlalchemy.exc import NoSuchTableError
5+
import sqlalchemy as sa
66

77
from rs_sqla_test_utils import models, utils
88

@@ -195,7 +195,7 @@ def test_no_table_reflection(redshift_session):
195195
def test_no_search_path_leak(redshift_session):
196196
metadata = MetaData(bind=redshift_session.bind)
197197
Table('basic', metadata, autoload=True)
198-
result = redshift_session.execute("SHOW search_path")
198+
result = redshift_session.execute(sa.text("SHOW search_path"))
199199
search_path = result.scalar()
200200
assert 'other_schema' not in search_path
201201

@@ -207,12 +207,12 @@ def test_external_table_reflection(redshift_engine, iam_role_arn):
207207
iam_role '{iam_role_arn}'
208208
create external database if not exists;
209209
"""
210-
211-
conn = redshift_engine.connect()
212-
conn.execute(schema_ddl)
213-
insp = inspect(redshift_engine)
214-
all_schemas = insp.get_schema_names()
215-
assert 'bananas' in all_schemas
210+
with redshift_engine.connect() as conn:
211+
conn.execute(sa.text(schema_ddl))
212+
conn.execute(sa.text('COMMIT'))
213+
insp = inspect(redshift_engine)
214+
all_schemas = insp.get_schema_names()
215+
assert 'bananas' in all_schemas
216216

217217
table_ddl = """create external table bananas.sales(
218218
salesid integer,
@@ -226,13 +226,19 @@ def test_external_table_reflection(redshift_engine, iam_role_arn):
226226
location 's3://awssampledbuswest2/tickit/spectrum/sales/'
227227
table properties ('numRows'='172000');
228228
"""
229-
with redshift_engine.connect() as conn:
230-
# Redshift can't run CREATE EXTERNAL TABLE inside a transaction
231-
# e.g. (BEGIN … END)
232-
if not isinstance(conn.dialect, PGDialect_psycopg2cffi):
233-
conn.execution_options(isolation_level="AUTOCOMMIT")
229+
from sqlalchemy_redshift.dialect import \
230+
RedshiftDialect_psycopg2cffi
231+
opts = (
232+
{"isolation_level": "AUTOCOMMIT"}
233+
if not isinstance(
234+
redshift_engine.dialect, RedshiftDialect_psycopg2cffi
235+
) else {}
236+
)
234237

235-
conn.execute(table_ddl)
238+
with redshift_engine.connect().execution_options(**opts) as conn:
239+
conn.execute(sa.text(table_ddl))
240+
if isinstance(redshift_engine.dialect, RedshiftDialect_psycopg2cffi):
241+
conn.execute(sa.text("COMMIT"))
236242

237243
insp = inspect(redshift_engine)
238244
table_columns_definition = insp.get_columns(
@@ -245,8 +251,12 @@ def test_external_table_reflection(redshift_engine, iam_role_arn):
245251
assert 'pricepaid' in table_columns
246252

247253
# Drop external table because we are using `AUTOCOMMIT`
248-
conn.execute("DROP TABLE IF EXISTS bananas.sales")
254+
conn.execute(sa.text("DROP TABLE IF EXISTS bananas.sales"))
249255

250256
# Also drop the external db:
251257
# https://docs.aws.amazon.com/redshift/latest/dg/r_DROP_DATABASE.html
252-
conn.execute("DROP SCHEMA IF EXISTS bananas DROP EXTERNAL DATABASE")
258+
conn.execute(
259+
sa.text("DROP SCHEMA IF EXISTS bananas DROP EXTERNAL DATABASE")
260+
)
261+
if isinstance(redshift_engine.dialect, RedshiftDialect_psycopg2cffi):
262+
conn.execute(sa.text("COMMIT"))

tests/test_reflection_views.py

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from sqlalchemy import MetaData, Table, inspect
22
from sqlalchemy.schema import CreateTable
3-
3+
import sqlalchemy as sa
44
from rs_sqla_test_utils.utils import clean, compile_query
55

66

@@ -13,42 +13,48 @@ def test_view_reflection(redshift_engine):
1313
table_ddl = "CREATE TABLE my_table (col1 INTEGER, col2 INTEGER)"
1414
view_query = "SELECT my_table.col1, my_table.col2 FROM my_table"
1515
view_ddl = "CREATE VIEW my_view AS %s" % view_query
16-
conn = redshift_engine.connect()
17-
try:
18-
conn.execute(table_ddl)
19-
conn.execute(view_ddl)
20-
insp = inspect(redshift_engine)
21-
view_definition = insp.get_view_definition('my_view')
22-
assert clean(
23-
compile_query(view_definition, redshift_engine.dialect)
24-
) == clean(view_query)
25-
view = Table('my_view', MetaData(),
26-
autoload=True, autoload_with=redshift_engine)
27-
assert(len(view.columns) == 2)
28-
finally:
29-
conn.execute('DROP TABLE IF EXISTS my_table CASCADE')
30-
conn.execute('DROP VIEW IF EXISTS my_view CASCADE')
16+
17+
with redshift_engine.connect() as conn:
18+
try:
19+
conn.execute(sa.text(table_ddl))
20+
conn.execute(sa.text(view_ddl))
21+
conn.execute(sa.text("COMMIT"))
22+
insp = inspect(redshift_engine)
23+
view_definition = insp.get_view_definition('my_view')
24+
assert clean(
25+
compile_query(view_definition, redshift_engine.dialect)
26+
) == clean(view_query)
27+
view = Table('my_view', MetaData(),
28+
autoload=True, autoload_with=redshift_engine)
29+
assert(len(view.columns) == 2)
30+
finally:
31+
conn.execute(sa.text('DROP TABLE IF EXISTS my_table CASCADE'))
32+
conn.execute(sa.text('DROP VIEW IF EXISTS my_view CASCADE'))
33+
conn.execute(sa.text("COMMIT"))
3134

3235

3336
def test_late_binding_view_reflection(redshift_engine):
3437
table_ddl = "CREATE TABLE my_table (col1 INTEGER, col2 INTEGER)"
3538
view_query = "SELECT my_table.col1, my_table.col2 FROM public.my_table"
3639
view_ddl = ("CREATE VIEW my_late_view AS "
3740
"%s WITH NO SCHEMA BINDING" % view_query)
38-
conn = redshift_engine.connect()
39-
try:
40-
conn.execute(table_ddl)
41-
conn.execute(view_ddl)
42-
insp = inspect(redshift_engine)
43-
view_definition = insp.get_view_definition('my_late_view')
44-
45-
# Redshift returns the entire DDL for late binding views.
46-
assert clean(
47-
compile_query(view_definition, redshift_engine.dialect)
48-
) == clean(view_ddl)
49-
view = Table('my_late_view', MetaData(),
50-
autoload=True, autoload_with=redshift_engine)
51-
assert(len(view.columns) == 2)
52-
finally:
53-
conn.execute('DROP TABLE IF EXISTS my_table CASCADE')
54-
conn.execute('DROP VIEW IF EXISTS my_late_view CASCADE')
41+
42+
with redshift_engine.connect() as conn:
43+
try:
44+
conn.execute(sa.text(table_ddl))
45+
conn.execute(sa.text(view_ddl))
46+
conn.execute(sa.text("COMMIT"))
47+
insp = inspect(redshift_engine)
48+
view_definition = insp.get_view_definition('my_late_view')
49+
50+
# Redshift returns the entire DDL for late binding views.
51+
assert clean(
52+
compile_query(view_definition, redshift_engine.dialect)
53+
) == clean(view_ddl)
54+
view = Table('my_late_view', MetaData(),
55+
autoload=True, autoload_with=redshift_engine)
56+
assert(len(view.columns) == 2)
57+
finally:
58+
conn.execute(sa.text('DROP TABLE IF EXISTS my_table CASCADE'))
59+
conn.execute(sa.text('DROP VIEW IF EXISTS my_late_view CASCADE'))
60+
conn.execute(sa.text('COMMIT'))

0 commit comments

Comments
 (0)