Skip to content

Commit 6fbf536

Browse files
committed
feat: Adds ability to connect to datashare databases for clusters and serverless workgroups running the PREVIEW_2023 track
1 parent d3c9427 commit 6fbf536

File tree

5 files changed

+234
-43
lines changed

5 files changed

+234
-43
lines changed

redshift_connector/core.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,9 +853,44 @@ def _is_multi_databases_catalog_enable_in_server(self: "Connection") -> bool:
853853
# if we don't receive this param from the server, we do not support
854854
return False
855855

856+
@property
857+
def _is_cross_datasharing_enable_in_server(self: "Connection") -> bool:
858+
"""
859+
Returns True if cross datasharing is enabled in the server. Returns False if disabled or not received.
860+
:return:
861+
:rtype:
862+
"""
863+
cross_datasharing_enable_in_server: bool = False
864+
865+
for parameter in self.parameter_statuses:
866+
if parameter[0] == b"external_database":
867+
if parameter[1] == b"on":
868+
cross_datasharing_enable_in_server = True
869+
elif parameter[1] == b"off":
870+
cross_datasharing_enable_in_server = False
871+
else:
872+
raise InterfaceError(
873+
"Protocol error. Session setup failed. Invalid value of external_database parameter. Only on/off are valid values"
874+
)
875+
break
876+
return cross_datasharing_enable_in_server
877+
856878
@property
857879
def is_single_database_metadata(self):
858-
return self._database_metadata_current_db_only or not self._is_multi_databases_catalog_enable_in_server
880+
"""
881+
Returns True if single database metadata enabled using ``database_metadata_current_db_only`` connection
882+
parameter or if server has neither multi-database catalog nor cross datashare enabled.
883+
Returns False if ``database_metadata_current_db_only`` connection parameter is disabled or if server
884+
has neither multi-database catalog nor cross datashare enabled.
885+
:return:
886+
:rtype:
887+
"""
888+
# for cross datasharing we always return False
889+
if self._is_cross_datasharing_enable_in_server:
890+
return False
891+
892+
else:
893+
return self._database_metadata_current_db_only or not self._is_multi_databases_catalog_enable_in_server
859894

860895
def handle_ERROR_RESPONSE(self: "Connection", data, ps):
861896
"""

redshift_connector/cursor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ def __is_valid_table(self: "Cursor", table: str) -> bool:
542542
if len(split_table_name) > 2:
543543
return False
544544

545-
q: str = "select 1 from information_schema.tables where table_name = ?"
545+
q: str = "select 1 from pg_catalog.svv_all_tables where table_name = ?"
546546

547547
temp = self.paramstyle
548548
self.paramstyle = DbApiParamstyle.QMARK.value

test/conftest.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,30 @@ def serverless_cname_db_kwargs() -> typing.Dict[str, typing.Union[str, bool]]:
130130

131131
return db_connect
132132

133+
@pytest.fixture(scope="class")
134+
def ds_consumer_db_kwargs() -> typing.Dict[str, str]:
135+
db_connect = {
136+
"database": conf.get("redshift-ds-consumer", "database", fallback="mock_database"),
137+
"host": conf.get("redshift-ds-consumer", "host", fallback="cname.mytest.com"),
138+
"user": conf.get("redshift-ds-consumer", "user", fallback="mock_user"),
139+
"password": conf.get("redshift-ds-consumer", "password", fallback="mock_password"),
140+
"extra": conf.get("redshift-ds-consumer", "extra", fallback="mock_extra"),
141+
}
142+
143+
return db_connect
144+
145+
146+
@pytest.fixture(scope="class")
147+
def ds_producer_db_kwargs() -> typing.Dict[str, str]:
148+
db_connect = {
149+
"database": conf.get("redshift-ds-producer", "database", fallback="mock_database"),
150+
"host": conf.get("redshift-ds-producer", "host", fallback="cname.mytest.com"),
151+
"user": conf.get("redshift-ds-producer", "user", fallback="mock_user"),
152+
"password": conf.get("redshift-ds-producer", "password", fallback="mock_password"),
153+
}
154+
155+
return db_connect
156+
133157

134158
@pytest.fixture(scope="class")
135159
def okta_idp() -> typing.Dict[str, typing.Union[str, bool, int]]:

test/integration/metadata/test_list_catalog.py

Lines changed: 112 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,17 @@
66

77
import redshift_connector
88

9-
# from warnings import filterwarnings
9+
# Pre-reqs
10+
# These tests require the following:
11+
# - A single Redshift instance OR two Redshift instances configured for datasharing as consumer and producer.
12+
# - Config.ini entry for [ci-cluster] file populated with the host and port of the single Redshift instance or the
13+
# producer instance if you want to test datasharing metadata methods
14+
#
15+
# How to use this test file
16+
# The included tests can be run to ensure the expected Redshift system tables are queried. Running this test via CLI
17+
# and using tee can enable to you compare and contrast result sets returned by metadata APIs on multiple clusters with
18+
# different patch versions. Please note the result set output is only useful if the same data is accessible by the
19+
# database user in all test situations.
1020

1121

1222
conf: configparser.ConfigParser = configparser.ConfigParser()
@@ -145,23 +155,27 @@ def test_get_schemas(mocker, _input, db_kwargs) -> None:
145155
database_metadata_current_db_only_val, _args = _input
146156
db_kwargs["database_metadata_current_db_only"] = database_metadata_current_db_only_val
147157
with redshift_connector.connect(**db_kwargs) as conn:
148-
assert conn.is_single_database_metadata is database_metadata_current_db_only_val
149-
150158
with conn.cursor() as cursor:
151159
spy = mocker.spy(cursor, "execute")
152160
result: typing.Tuple = cursor.get_schemas(**_args)
153-
# ensure query was executed with arguments passed to get_schemas
161+
print(result)
162+
# ensure execute was called
154163
assert spy.called
155-
assert spy.call_count == 1
156-
157-
if _args["schema_pattern"] is not None: # should be in parameterized portion
158-
assert _args["schema_pattern"] in spy.call_args[0][1]
164+
assert spy.call_count == 1 # call in get_schemas()
159165

166+
# ensure execute was called with the catalog value in the prepared statement
160167
if _args["catalog"] is not None:
161168
assert _args["catalog"] in spy.call_args[0][0]
162169

163-
assert len(result) > 0, print(spy.call_args, "\n", result)
164-
assert len(result[0]) == 2
170+
# ensure execute was called with below bind parameters
171+
if _args["schema_pattern"] is not None:
172+
assert _args["schema_pattern"] in spy.call_args[0][1]
173+
174+
# assert query text executed contains the target table name
175+
if conn.is_single_database_metadata:
176+
assert "FROM pg_catalog.pg_namespace" in spy.call_args[0][0]
177+
else:
178+
assert "FROM PG_CATALOG.SVV_ALL_SCHEMAS" in spy.call_args[0][0]
165179

166180

167181
def get_tables_test_data() -> typing.List[typing.Optional[typing.Tuple[bool, typing.Dict[str, typing.Any]]]]:
@@ -272,35 +286,54 @@ def test_get_tables(mocker, _input, db_kwargs) -> None:
272286
database_metadata_current_db_only_val, _args = _input
273287
db_kwargs["database_metadata_current_db_only"] = database_metadata_current_db_only_val
274288
with redshift_connector.connect(**db_kwargs) as conn:
275-
assert conn.is_single_database_metadata is database_metadata_current_db_only_val
276-
277289
with conn.cursor() as cursor:
278290
spy = mocker.spy(cursor, "execute")
279291
result: typing.Tuple = cursor.get_tables(**_args)
280292
print(result)
281-
# ensure query was executed with arguments passed to get_schemas
293+
# ensure execute was called
282294
assert spy.called
283295

284-
if _args["schema_pattern"] is not None and database_metadata_current_db_only_val:
296+
if _args["schema_pattern"] is not None and conn.is_single_database_metadata:
285297
assert spy.call_count == 2 # call in __schema_pattern_match(), get_tables()
286298
else:
287-
assert spy.call_count == 1
288-
289-
if _args["schema_pattern"] is not None: # should be in parameterized portion
290-
print(spy.call_args)
291-
print(spy.call_args[0][1])
292-
print(_args, database_metadata_current_db_only_val, TestListCatalog.test_schema)
293-
assert _args["schema_pattern"] in spy.call_args[0][1], print(spy.call_args)
299+
assert spy.call_count == 1 # call in get_tables()
294300

301+
# ensure execute was called with the catalog value in the prepared statement
295302
if _args["catalog"] is not None:
296303
assert _args["catalog"] in spy.call_args[0][0]
297304

305+
# ensure execute was called with below bind parameters
298306
for arg in (_args["schema_pattern"], _args["table_name_pattern"]):
299307
if arg is not None:
300308
assert arg in spy.call_args[0][1]
301309

302-
assert len(result) > 0, print(spy.call_args, "\n", result)
303-
assert len(result[0]) == 10
310+
# we cannot easily know what schema pattern matches in Python driver, so
311+
# we check table is one of a few options based on whether is_single_database_metadata
312+
# is true or false
313+
314+
possible_not_ds_tables = (
315+
"FROM svv_tables", # universal
316+
"FROM pg_catalog.pg_namespace n, pg_catalog.pg_class", # local
317+
"FROM svv_external_tables", # external
318+
)
319+
possible_ds_tables = (
320+
"FROM PG_CATALOG.SVV_ALL_TABLES", # universal
321+
"FROM pg_catalog.pg_namespace n, pg_catalog.pg_class", # local
322+
"FROM svv_external_tables", # external
323+
)
324+
325+
if conn.is_single_database_metadata:
326+
for table in possible_not_ds_tables:
327+
if table in spy.call_args[0][0]:
328+
assert 1 == 1
329+
return
330+
assert 1 == 0, spy.call_args[0][0]
331+
else:
332+
for table in possible_ds_tables:
333+
if table in spy.call_args[0][0]:
334+
assert 1 == 1
335+
return
336+
assert 1 == 0, spy.call_args[0][0]
304337

305338

306339
def get_columns_test_data() -> typing.List[typing.Tuple[bool, typing.Dict[str, typing.Optional[str]]]]:
@@ -482,19 +515,19 @@ def test_get_columns(mocker, _input, db_kwargs) -> None:
482515
database_metadata_current_db_only_val, _args = _input
483516
db_kwargs["database_metadata_current_db_only"] = database_metadata_current_db_only_val
484517
with redshift_connector.connect(**db_kwargs) as conn:
485-
assert conn.is_single_database_metadata is database_metadata_current_db_only_val
486-
487518
with conn.cursor() as cursor:
488519
spy = mocker.spy(cursor, "execute")
489520
result: typing.Tuple = cursor.get_columns(**_args)
490-
# ensure query was executed with arguments passed to get_schemas
521+
print(result)
522+
# ensure execute was called
491523
assert spy.called
492524

493-
if _args["schema_pattern"] is not None and database_metadata_current_db_only_val:
525+
if _args["schema_pattern"] is not None and conn.is_single_database_metadata:
494526
assert spy.call_count == 2 # call in __schema_pattern_match(), get_columns()
495527
else:
496-
assert spy.call_count == 1
528+
assert spy.call_count == 1 # call in get_columns()
497529

530+
# ensure execute was called with below bind parameters
498531
for arg in (
499532
_args["catalog"],
500533
_args["schema_pattern"],
@@ -504,5 +537,54 @@ def test_get_columns(mocker, _input, db_kwargs) -> None:
504537
if arg is not None:
505538
assert arg in spy.call_args[0][0]
506539

507-
assert len(result) > 0, print(spy.call_args, "\n", result)
508-
assert len(result[0]) == 24
540+
# we cannot easily know what schema pattern matches in Python driver, so
541+
# we check table is one of a few options based on whether is_single_database_metadata
542+
# is true or false
543+
544+
possible_not_ds_tables = (
545+
"FROM svv_columns", # universal
546+
"FROM pg_catalog.pg_namespace", # local
547+
"FROM svv_external_columns", # external
548+
)
549+
possible_ds_tables = (
550+
"FROM PG_CATALOG.svv_all_columns", # universal
551+
"FROM pg_catalog.pg_namespace", # local
552+
"FROM svv_external_columns", # external
553+
)
554+
555+
if conn.is_single_database_metadata:
556+
for table in possible_not_ds_tables:
557+
if table in spy.call_args[0][0]:
558+
assert 1 == 1
559+
return
560+
assert 1 == 0, spy.call_args[0][0]
561+
else:
562+
for table in possible_ds_tables:
563+
if table in spy.call_args[0][0]:
564+
assert 1 == 1
565+
return
566+
assert 1 == 0, spy.call_args[0][0]
567+
568+
569+
def get_catalogs_test_data() -> typing.List[bool]:
570+
return [True, False]
571+
572+
573+
@pytest.mark.skip(reason="manual")
574+
@pytest.mark.parametrize("_input", get_catalogs_test_data())
575+
def test_get_catalogs(mocker, _input, db_kwargs) -> None:
576+
database_metadata_current_db_only_val = _input
577+
db_kwargs["database_metadata_current_db_only"] = database_metadata_current_db_only_val
578+
with redshift_connector.connect(**db_kwargs) as conn:
579+
with conn.cursor() as cursor:
580+
spy = mocker.spy(cursor, "execute")
581+
result: typing.Tuple = cursor.get_catalogs()
582+
print(result)
583+
# ensure execute was called
584+
assert spy.called
585+
assert spy.call_count == 1 # call in get_catalogs()
586+
587+
if conn.is_single_database_metadata:
588+
assert "current_database()" in spy.call_args[0][0]
589+
else:
590+
assert "PG_CATALOG.SVV_REDSHIFT_DATABASES" in spy.call_args[0][0]

test/unit/test_connection.py

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -296,26 +296,76 @@ def test_is_multidatabases_catalog_enable_in_server(_input) -> None:
296296
assert mock_connection._is_multi_databases_catalog_enable_in_server == exp_val
297297

298298

299-
test_is_single_database_metadata_data: typing.List[typing.Tuple[typing.Optional[str], bool, bool]] = [
300-
("on", True, True),
301-
("on", False, False),
302-
("off", True, True),
303-
("off", False, True),
304-
(None, True, True),
305-
(None, False, True),
299+
test_is_cross_datasharing_enable_in_server_data: typing.List[typing.Tuple[typing.Optional[str], bool]] = [
300+
("on", True),
301+
("off", False),
302+
(None, False),
303+
]
304+
305+
306+
@pytest.mark.parametrize("_input", test_is_cross_datasharing_enable_in_server_data)
307+
def test_is_cross_datasharing_enable_in_server(_input):
308+
param_status, exp_val = _input
309+
mock_connection: Connection = Connection.__new__(Connection)
310+
mock_connection.parameter_statuses: deque = deque()
311+
312+
if param_status is not None:
313+
mock_connection.parameter_statuses.append((b"external_database", param_status.encode()))
314+
315+
assert mock_connection._is_cross_datasharing_enable_in_server == exp_val
316+
317+
318+
def test_is_cross_datasharing_enable_in_server_raises_on_invalid():
319+
param_status = "garbage"
320+
mock_connection: Connection = Connection.__new__(Connection)
321+
mock_connection.parameter_statuses: deque = deque()
322+
323+
if param_status is not None:
324+
mock_connection.parameter_statuses.append((b"external_database", param_status.encode()))
325+
326+
with pytest.raises(
327+
InterfaceError,
328+
match="Protocol error. Session setup failed. Invalid value of external_database parameter. Only on/off are valid values",
329+
):
330+
mock_connection._is_cross_datasharing_enable_in_server
331+
332+
333+
test_is_single_database_metadata_data: typing.List[
334+
typing.Tuple[typing.Optional[str], typing.Optional[str], bool, bool]
335+
] = [
336+
("on", "on", True, False),
337+
("on", "on", False, False),
338+
("on", None, True, True),
339+
("on", None, False, False),
340+
(None, "on", True, False),
341+
(None, "on", False, False),
342+
("off", "on", True, False),
343+
("off", "on", False, False),
344+
("on", "off", True, True),
345+
("on", "off", False, False),
346+
("off", "off", False, True),
347+
("off", "off", True, True),
348+
("off", None, False, True),
349+
("off", None, True, True),
350+
(None, "off", False, True),
351+
(None, "off", True, True),
352+
(None, None, True, True),
353+
(None, None, False, True),
306354
]
307355

308356

309357
@pytest.mark.parametrize("_input", test_is_single_database_metadata_data)
310-
def test_is_single_database_metadata(_input) -> None:
311-
param_status, database_metadata_current_db_only_val, exp_val = _input
358+
def test_is_single_database_metadata(_input):
359+
datashare_enabled, dsw_enabled, database_metadata_current_db_only_val, exp_val = _input
312360

313361
mock_connection: Connection = Connection.__new__(Connection)
314362
mock_connection.parameter_statuses = deque()
315363
mock_connection._database_metadata_current_db_only = database_metadata_current_db_only_val
316364

317-
if param_status is not None:
318-
mock_connection.parameter_statuses.append((b"datashare_enabled", param_status.encode()))
365+
if datashare_enabled is not None:
366+
mock_connection.parameter_statuses.append((b"datashare_enabled", datashare_enabled.encode()))
367+
if dsw_enabled is not None:
368+
mock_connection.parameter_statuses.append((b"external_database", dsw_enabled.encode()))
319369

320370
assert mock_connection.is_single_database_metadata == exp_val
321371

0 commit comments

Comments
 (0)