@@ -50,7 +50,10 @@ def tbl_y(data):
5050 snapshots = init_replication (
5151 slot_name = slot_name ,
5252 pub_name = pub_name ,
53- table_names = None , # will initialize schema replication and get list of snapshots from sql_database
53+ table_names = [
54+ "tbl_x" ,
55+ "tbl_y" ,
56+ ], # will initialize schema replication and get list of snapshots from sql_database, set to None if creds have superuser privileges
5457 schema_name = src_pl .dataset_name ,
5558 persist_snapshots = True ,
5659 )
@@ -91,14 +94,25 @@ def tbl_y(data):
9194 assert_loaded_data (dest_pl , "tbl_y" , ["id_y" , "val_y" ], exp_tbl_y , "id_y" )
9295
9396 # add new table and load data, we should receive it in publication slot
94- # because we track the whole schema
97+ # after adding it to the publication
9598 @dlt .resource (write_disposition = "append" , primary_key = "id_z" )
9699 def tbl_z (data ):
97100 yield data
98101
99- src_pl .run (tbl_z ([{"id_z" : 2 , "val_x" : "bar" }, {"id_z" : 3 }]))
102+ # create the table and add pk first
103+ src_pl .run (tbl_z ([{"id_z" : 1 , "val_x" : "initial" }]))
100104 add_pk (src_pl .sql_client , "tbl_z" , "id_z" )
101- # for some reason I need to add data to see that PK was added
105+
106+ # add the new table to the existing publication before inserting data we want to capture
107+ init_replication (
108+ slot_name = slot_name ,
109+ pub_name = pub_name ,
110+ table_names = ["tbl_x" , "tbl_y" , "tbl_z" ],
111+ schema_name = src_pl .dataset_name ,
112+ )
113+
114+ # now insert the data that should be captured in replication
115+ src_pl .run (tbl_z ([{"id_z" : 2 , "val_x" : "bar" }, {"id_z" : 3 }]))
102116 src_pl .run (tbl_z ([{"id_z" : 4 , "val_x" : "bar" }, {"id_z" : 5 }]))
103117
104118 info = dest_pl .run (replication_resource (slot_name , pub_name ))
@@ -461,6 +475,96 @@ def items(data):
461475 )
462476
463477
478+ @pytest .mark .parametrize ("init_load" , [True , False ])
479+ @pytest .mark .parametrize ("destination_name" , ALL_DESTINATIONS )
480+ def test_timestamp_without_tz_and_json (
481+ src_config : Tuple [dlt .Pipeline , str , str ],
482+ init_load : bool ,
483+ destination_name : str ,
484+ ) -> None :
485+ """Test that PostgreSQL OID 114 (json) and OID 1114 (timestamp without timezone) are correctly mapped.
486+
487+ This test verifies the fix for the issue where pg_replication source didn't handle these
488+ data types correctly, causing them to default to 'text' type instead of proper type mapping.
489+ """
490+ data = deepcopy ({k : TABLE_ROW_ALL_DATA_TYPES [k ] for k in ("col4" , "col9" )})
491+ data ["col0" ] = deepcopy (data ["col4" ]) # same value as col4
492+
493+ # resource to load data into postgres source table
494+ @dlt .resource (columns = {"col9" : {"data_type" : "json" }})
495+ def items (data ):
496+ yield data
497+
498+ src_pl , slot_name , pub_name = src_config
499+
500+ # create postgres table with the three records
501+ src_pl .run (items (data ))
502+
503+ # ensure source has expected col types for testing:
504+ # col0: timestamp without time zone (OID 1114)
505+ # col4: timestamp with time zone (OID 1184)
506+ # col9: jsonb (OID 3802)
507+ with src_pl .sql_client () as c :
508+ query = f"""
509+ SELECT data_type
510+ FROM information_schema.columns
511+ WHERE table_schema = '{ src_pl .dataset_name } '
512+ AND table_name = 'items'
513+ AND column_name = %s;
514+ """
515+ col0_type = c .execute_sql (query , ("col0" ))[0 ][0 ]
516+ assert "timestamp with time zone" == col0_type
517+ # we need to alter type to timestamp without time zone for col0
518+ qual_name = src_pl .sql_client ().make_qualified_table_name ("items" )
519+ c .execute_sql (
520+ f"ALTER TABLE { qual_name } ALTER COLUMN col0 TYPE timestamp WITHOUT TIME ZONE USING (col0 AT TIME ZONE 'UTC');"
521+ )
522+ col0_type = c .execute_sql (query , ("col0" ))[0 ][0 ]
523+ assert "timestamp without time zone" == col0_type
524+
525+ col4_type = c .execute_sql (query , ("col4" ))[0 ][0 ]
526+ assert "timestamp with time zone" == col4_type
527+
528+ # since dlt's PostgresTypeMapper always loads json as jsonb,
529+ # we need to alter type to test json
530+ c .execute_sql (
531+ f"ALTER TABLE { qual_name } ALTER COLUMN col9 TYPE json USING col9::json;"
532+ )
533+ col9_type = c .execute_sql (query , ("col9" ))[0 ][0 ]
534+ assert "json" == col9_type
535+
536+ # load to destination
537+ dest_pl = dlt .pipeline (
538+ pipeline_name = "dest_pl" ,
539+ destination = destination_name ,
540+ )
541+
542+ # create snapshot resource based on init_load parameter
543+ snapshot = init_replication (
544+ slot_name = slot_name ,
545+ pub_name = pub_name ,
546+ schema_name = src_pl .dataset_name ,
547+ table_names = "items" ,
548+ persist_snapshots = init_load ,
549+ )
550+ changes = replication_resource (slot_name , pub_name )
551+
552+ if init_load :
553+ info = dest_pl .run (snapshot )
554+ assert_load_info (info )
555+
556+ src_pl .run (items (data ))
557+ info = dest_pl .run (changes )
558+ assert_load_info (info )
559+
560+ col_schemas = dest_pl .default_schema .get_table_columns ("items" )
561+ assert "timestamp" == col_schemas ["col0" ]["data_type" ]
562+ assert not col_schemas ["col0" ].get ("timezone" )
563+ assert "timestamp" == col_schemas ["col4" ]["data_type" ]
564+ assert col_schemas ["col4" ].get ("timezone" ) is True
565+ assert "json" == col_schemas ["col9" ]["data_type" ]
566+
567+
464568@pytest .mark .parametrize ("destination_name" , ALL_DESTINATIONS )
465569def test_unmapped_data_types (
466570 src_config : Tuple [dlt .Pipeline , str , str ], destination_name : str
0 commit comments