@@ -526,16 +526,17 @@ def get_test_db_pairs():
526526for source_db , source_type_categories , target_db , target_type_categories in get_test_db_pairs ():
527527 for type_category , source_types in source_type_categories .items (): # int, datetime, ..
528528 for source_type in source_types :
529- for target_type in target_type_categories [type_category ]:
530- type_pairs .append (
531- (
532- source_db ,
533- target_db ,
534- source_type ,
535- target_type ,
536- type_category ,
529+ if type_category in target_type_categories : # only cross-compatible types
530+ for target_type in target_type_categories [type_category ]:
531+ type_pairs .append (
532+ (
533+ source_db ,
534+ target_db ,
535+ source_type ,
536+ target_type ,
537+ type_category ,
538+ )
537539 )
538- )
539540
540541
541542def sanitize (name ):
@@ -603,8 +604,15 @@ def _insert_to_table(conn, table_path, values, coltype):
603604 elif isinstance (conn , db .BigQuery ) and coltype == "datetime" :
604605 values = [(i , Code (f"cast(timestamp '{ sample } ' as datetime)" )) for i , sample in values ]
605606
606- if isinstance (conn , db .Redshift ) and coltype == "json" :
607- values = [(i , Code (f"JSON_PARSE('{ sample } ')" )) for i , sample in values ]
607+ elif isinstance (conn , db .Redshift ) and coltype in ("json" , "jsonb" ):
608+ values = [(i , Code (f"JSON_PARSE({ sample } )" )) for i , sample in values ]
609+ elif isinstance (conn , db .PostgreSQL ) and coltype in ("json" , "jsonb" ):
610+ values = [(i , Code (
611+ "'{}'" .format (
612+ (json .dumps (sample ) if isinstance (sample , (dict , list )) else sample )
613+ .replace ('\' ' , '\' \' ' )
614+ )
615+ )) for i , sample in values ]
608616
609617 insert_rows_in_batches (conn , tbl , values , columns = ["id" , "col" ])
610618 conn .query (commit )
@@ -627,9 +635,10 @@ def _create_table_with_indexes(conn, table_path, type_):
627635 else :
628636 conn .query (tbl .create ())
629637
630- if conn . dialect . SUPPORTS_INDEXES :
631- ( index_id ,) = table_path
638+ ( index_id ,) = table_path
639+ if conn . dialect . SUPPORTS_INDEXES and type_ not in ( 'json' , 'jsonb' , 'array' , 'struct' ):
632640 conn .query (f"CREATE INDEX xa_{ index_id } ON { table_name } ({ quote ('id' )} , { quote ('col' )} )" )
641+ if conn .dialect .SUPPORTS_INDEXES :
633642 conn .query (f"CREATE INDEX xb_{ index_id } ON { table_name } ({ quote ('id' )} )" )
634643
635644 conn .query (commit )
@@ -724,9 +733,11 @@ def test_types(self, source_db, target_db, source_type, target_type, type_catego
724733 checksum_duration = time .monotonic () - start
725734 expected = []
726735 self .assertEqual (expected , diff )
727- self .assertEqual (
728- 0 , differ .stats .get ("rows_downloaded" , 0 )
729- ) # This may fail if the hash is different, but downloaded values are equal
736+
737+ # For fuzzily diffed types, some rows can be downloaded for local comparison. This happens
738+ # when hashes are diferent but the essential payload is not; e.g. due to json serialization.
739+ if not {source_type , target_type } & {'json' , 'jsonb' , 'array' , 'struct' }:
740+ self .assertEqual (0 , differ .stats .get ("rows_downloaded" , 0 ))
730741
731742 # This section downloads all rows to ensure that Python agrees with the
732743 # database, in terms of comparison.
0 commit comments