@@ -36,22 +36,29 @@ def __init__(self, connection_string, logger=None):
3636 def can_handle_connection_string (connection_string ):
3737 return MsSqlDataSource .__connection_string_regex_match (connection_string ) is not None
3838
39- @staticmethod
40- def __connection_string_regex_match (connection_string ):
41- return re .match (MsSqlDataSource .MSSQL_STRING_REGEX , connection_string )
39+ def get_table_info (self , table_config , last_known_sync_version ):
40+ columns_in_database = self .__get_table_columns (table_config )
41+ change_tracking_info = self .__get_change_tracking_info (table_config , last_known_sync_version )
42+ source_table_info = SourceTableInfo (columns_in_database , change_tracking_info )
43+ return source_table_info
4244
43- @staticmethod
44- def connection_string_prefix ():
45- return 'mssql+pyodbc://'
45+ @prevent_senstive_data_logging
46+ def get_table_data_frame (self , table_config , columns , batch_config , batch_tracker , batch_key_tracker ,
47+ full_refresh , change_tracking_info ):
48+ sql = self .__build_select_statement (table_config , columns , batch_config , batch_key_tracker ,
49+ full_refresh , change_tracking_info )
50+
51+ self .logger .debug (f"Starting read of SQL Statement: \n { sql } " )
52+ data_frame = pandas .read_sql_query (sql , self .database_engine )
53+ self .logger .debug ("Completed read" )
54+
55+ batch_tracker .extract_completed_successfully (len (data_frame ))
56+
57+ return data_frame
4658
4759 @staticmethod
48- def prefix_column (column_name , full_refresh , primary_key_column_names ):
49- if not isinstance (primary_key_column_names , (list , tuple )):
50- raise TypeError (f"Argument 'primary_key_column_names' must be a list or tuple" )
51- if column_name in primary_key_column_names and not full_refresh :
52- return f"{ MsSqlDataSource .CHANGE_TABLE_ALIAS } .{ column_name } "
53- else :
54- return f"{ MsSqlDataSource .SOURCE_TABLE_ALIAS } .{ column_name } "
60+ def __connection_string_regex_match (connection_string ):
61+ return re .match (MsSqlDataSource .MSSQL_STRING_REGEX , connection_string )
5562
5663 def __create_connection_with_failover (self ):
5764 conn_string_data = MsSqlDataSource .__connection_string_regex_match (self .connection_string )
@@ -83,41 +90,6 @@ def __create_connection_with_failover(self):
8390 return pyodbc .connect (dsn , server = failover )
8491 raise e
8592
86- def __build_select_statement (self , table_config , columns , batch_config , batch_key_tracker , full_refresh ,
87- change_tracking_info ):
88- column_array = list (
89- map (lambda cfg : self .prefix_column (cfg ['source_name' ], full_refresh , table_config ['primary_keys' ]),
90- columns ))
91- column_names = ", " .join (column_array )
92-
93- if full_refresh :
94- select_sql = f"SELECT TOP ({ batch_config ['size' ]} ) { column_names } "
95- from_sql = f"FROM { table_config ['schema' ]} .{ table_config ['name' ]} AS { MsSqlDataSource .SOURCE_TABLE_ALIAS } "
96- where_sql = f"WHERE { self .__build_where_clause (batch_key_tracker , MsSqlDataSource .SOURCE_TABLE_ALIAS )} "
97- order_by_sql = "ORDER BY " + f", { MsSqlDataSource .SOURCE_TABLE_ALIAS } ." .join (table_config ['primary_keys' ])
98- else :
99- select_sql = f"SELECT TOP ({ batch_config ['size' ]} ) { column_names } , " \
100- f"{ MsSqlDataSource .CHANGE_TABLE_ALIAS } .SYS_CHANGE_VERSION" \
101- f" AS { Providers .AuditColumnsNames .CHANGE_VERSION } , " \
102- f"CASE { MsSqlDataSource .CHANGE_TABLE_ALIAS } .SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 " \
103- f"END AS { Providers .AuditColumnsNames .IS_DELETED } "
104- from_sql = f"FROM CHANGETABLE(CHANGES" \
105- f" { table_config ['schema' ]} .{ table_config ['name' ]} ," \
106- f" { change_tracking_info .last_sync_version } )" \
107- f" AS { MsSqlDataSource .CHANGE_TABLE_ALIAS } " \
108- f" LEFT JOIN { table_config ['schema' ]} .{ table_config ['name' ]} AS { MsSqlDataSource .SOURCE_TABLE_ALIAS } " \
109- f" ON { self .__build_change_table_on_clause (batch_key_tracker )} "
110- where_sql = f"WHERE { self .__build_where_clause (batch_key_tracker , MsSqlDataSource .CHANGE_TABLE_ALIAS )} "
111- order_by_sql = "ORDER BY " + f", { MsSqlDataSource .CHANGE_TABLE_ALIAS } ." .join (table_config ['primary_keys' ])
112-
113- return f"{ select_sql } \n { from_sql } \n { where_sql } \n { order_by_sql } ;"
114-
115- def get_table_info (self , table_config , last_known_sync_version ):
116- columns_in_database = self .__get_table_columns (table_config )
117- change_tracking_info = self .__get_change_tracking_info (table_config , last_known_sync_version )
118- source_table_info = SourceTableInfo (columns_in_database , change_tracking_info )
119- return source_table_info
120-
12193 def __get_table_columns (self , table_config ):
12294 metadata = MetaData ()
12395 self .logger .debug (f"Reading definition for source table "
@@ -126,20 +98,6 @@ def __get_table_columns(self, table_config):
12698 autoload_with = self .database_engine )
12799 return list (map (lambda column : column .name , table .columns ))
128100
129- @prevent_senstive_data_logging
130- def get_next_data_frame (self , table_config , columns , batch_config , batch_tracker , batch_key_tracker ,
131- full_refresh , change_tracking_info ):
132- sql = self .__build_select_statement (table_config , columns , batch_config , batch_key_tracker ,
133- full_refresh , change_tracking_info )
134-
135- self .logger .debug (f"Starting read of SQL Statement: \n { sql } " )
136- data_frame = pandas .read_sql_query (sql , self .database_engine )
137- self .logger .debug ("Completed read" )
138-
139- batch_tracker .extract_completed_successfully (len (data_frame ))
140-
141- return data_frame
142-
143101 def __get_change_tracking_info (self , table_config , last_known_sync_version ):
144102
145103 if last_known_sync_version is None :
@@ -216,6 +174,43 @@ def __get_change_tracking_info(self, table_config, last_known_sync_version):
216174 return ChangeTrackingInfo (row ["last_sync_version" ], row ["sync_version" ],
217175 row ["force_full_load" ], row ["data_changed_since_last_sync" ])
218176
177+ def __build_select_statement (self , table_config , columns , batch_config , batch_key_tracker , full_refresh ,
178+ change_tracking_info ):
179+ column_array = list (map (lambda cfg : MsSqlDataSource .prefix_column (
180+ cfg ['source_name' ], full_refresh , table_config ['primary_keys' ]), columns ))
181+ column_names = ", " .join (column_array )
182+
183+ if full_refresh :
184+ select_sql = f"SELECT TOP ({ batch_config ['size' ]} ) { column_names } "
185+ from_sql = f"FROM { table_config ['schema' ]} .{ table_config ['name' ]} AS { MsSqlDataSource .SOURCE_TABLE_ALIAS } "
186+ where_sql = f"WHERE { self .__build_where_clause (batch_key_tracker , MsSqlDataSource .SOURCE_TABLE_ALIAS )} "
187+ order_by_sql = "ORDER BY " + f", { MsSqlDataSource .SOURCE_TABLE_ALIAS } ." .join (table_config ['primary_keys' ])
188+ else :
189+ select_sql = f"SELECT TOP ({ batch_config ['size' ]} ) { column_names } , " \
190+ f"{ MsSqlDataSource .CHANGE_TABLE_ALIAS } .SYS_CHANGE_VERSION" \
191+ f" AS { Providers .AuditColumnsNames .CHANGE_VERSION } , " \
192+ f"CASE { MsSqlDataSource .CHANGE_TABLE_ALIAS } .SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 " \
193+ f"END AS { Providers .AuditColumnsNames .IS_DELETED } "
194+ from_sql = f"FROM CHANGETABLE(CHANGES" \
195+ f" { table_config ['schema' ]} .{ table_config ['name' ]} ," \
196+ f" { change_tracking_info .last_sync_version } )" \
197+ f" AS { MsSqlDataSource .CHANGE_TABLE_ALIAS } " \
198+ f" LEFT JOIN { table_config ['schema' ]} .{ table_config ['name' ]} AS { MsSqlDataSource .SOURCE_TABLE_ALIAS } " \
199+ f" ON { self .__build_change_table_on_clause (batch_key_tracker )} "
200+ where_sql = f"WHERE { self .__build_where_clause (batch_key_tracker , MsSqlDataSource .CHANGE_TABLE_ALIAS )} "
201+ order_by_sql = "ORDER BY " + f", { MsSqlDataSource .CHANGE_TABLE_ALIAS } ." .join (table_config ['primary_keys' ])
202+
203+ return f"{ select_sql } \n { from_sql } \n { where_sql } \n { order_by_sql } ;"
204+
205+ @staticmethod
206+ def prefix_column (column_name , full_refresh , primary_key_column_names ):
207+ if not isinstance (primary_key_column_names , (list , tuple )):
208+ raise TypeError (f"Argument 'primary_key_column_names' must be a list or tuple" )
209+ if column_name in primary_key_column_names and not full_refresh :
210+ return f"{ MsSqlDataSource .CHANGE_TABLE_ALIAS } .{ column_name } "
211+ else :
212+ return f"{ MsSqlDataSource .SOURCE_TABLE_ALIAS } .{ column_name } "
213+
219214 @staticmethod
220215 def __build_where_clause (batch_key_tracker , table_alias ):
221216 has_value = False
0 commit comments