11import os
22import json
3+ import uuid
34import logging
5+ import hashlib
6+
47from modules .BatchDataLoader import BatchDataLoader
58from modules .DestinationTableManager import DestinationTableManager
6- from modules .DataLoadTracker import DataLoadTracker
7-
9+ from modules .data_load_tracking .DataLoadTracker import DataLoadTracker
810
911
1012class DataLoadManager (object ):
11- def __init__ (self , configuration_path , data_source , logger = None ):
13+ def __init__ (self , configuration_path , data_source , data_load_tracker_repository , logger = None ):
1214 self .logger = logger or logging .getLogger (__name__ )
1315 self .configuration_path = configuration_path
1416 self .data_source = data_source
15-
17+ self .data_load_tracker_repository = data_load_tracker_repository
18+ self .correlation_id = uuid .uuid4 ()
1619 def start_imports (self , target_engine , full_refresh ):
1720 for file in os .listdir (self .configuration_path ):
1821 self .start_single_import (target_engine , file , full_refresh )
1922
2023 self .logger .info ("Execution completed." )
2124
22- def start_single_import (self , target_engine , configuration_name , requested_full_refresh ):
23- self .logger .debug ("Using configuration file : {0}" .format (configuration_name ))
25+ def start_single_import (self , target_engine , model_name , requested_full_refresh ):
26+ self .logger .debug ("Using configuration file : {0}" .format (model_name ))
2427
25- config_file = os .path .abspath (self .configuration_path + configuration_name )
28+ config_file = os .path .abspath (self .configuration_path + model_name )
2629 self .logger .debug ("Using configuration file : {0}" .format (config_file ))
27- with open (config_file ) as json_data :
28- pipeline_configuration = json .load (json_data )
30+ with open (config_file ) as json_file :
31+ model_checksum = hashlib .md5 (json_file .read ().encode ('utf-8' )).hexdigest ()
32+ json_file .seek (0 )
33+ pipeline_configuration = json .load (json_file )
2934
30- self .logger .info ("Execute Starting for: {0} requested_full_refresh: {1}" .format (configuration_name , requested_full_refresh ))
35+ self .logger .info ("Execute Starting for: {0} requested_full_refresh: {1}" .format (model_name , requested_full_refresh ))
3136
3237 destination_table_manager = DestinationTableManager (target_engine )
3338
39+ full_refresh_reason = "Command Line Argument" if requested_full_refresh else "N/A"
3440 full_refresh = requested_full_refresh
3541 if not requested_full_refresh and not destination_table_manager .table_exists (pipeline_configuration ['target_schema' ],
3642 pipeline_configuration ['load_table' ]):
3743 self .logger .warning ("The load table {0}.{1} does not exist. Swapping to full-refresh mode" .format (pipeline_configuration ['target_schema' ],
3844 pipeline_configuration ['load_table' ]))
39- full_refresh = True
4045
41- data_load_tracker = DataLoadTracker (configuration_name , json_data , full_refresh )
46+ full_refresh_reason = "Destination table does not exist"
47+ full_refresh = True
4248
4349 self .data_source .assert_data_source_is_valid (pipeline_configuration ['source_table' ],
4450 pipeline_configuration ['columns' ])
4551
52+ last_successful_data_load_execution = self .data_load_tracker_repository .get_last_successful_data_load_execution (model_name )
53+
54+ if last_successful_data_load_execution is None :
55+ last_sync_version = 0
56+ full_refresh_reason = "First Execution"
57+ full_refresh = True
58+ else :
59+ self .logger .debug ("Previous Checksum {0}. Current Checksum {1}" .format (last_successful_data_load_execution .model_checksum , model_checksum ))
60+ last_sync_version = last_successful_data_load_execution .next_sync_version
61+ if not full_refresh and last_successful_data_load_execution .model_checksum != model_checksum :
62+ self .logger .info ("A model checksum change has forced this to be a full load" )
63+ full_refresh = True
64+ full_refresh_reason = "Model Change"
65+
66+ change_tracking_info = self .data_source .init_change_tracking (pipeline_configuration ['source_table' ],
67+ last_sync_version )
68+
69+
70+
71+ if not full_refresh and change_tracking_info .force_full_load :
72+ self .logger .info ("Change tracking has forced this to be a full load" )
73+ full_refresh = True
74+ full_refresh_reason = "Change Tracking Invalid"
75+
76+ data_load_tracker = DataLoadTracker (model_name , model_checksum , json_file , full_refresh , change_tracking_info ,
77+ self .correlation_id , full_refresh_reason )
78+
4679 columns = pipeline_configuration ['columns' ]
4780 destination_table_manager .create_schema (pipeline_configuration ['target_schema' ])
4881
@@ -60,7 +93,9 @@ def start_single_import(self, target_engine, configuration_name, requested_full_
6093 columns ,
6194 data_load_tracker ,
6295 pipeline_configuration ['batch' ],
63- target_engine )
96+ target_engine ,
97+ full_refresh ,
98+ change_tracking_info )
6499
65100 previous_unique_column_value = 0
66101 while previous_unique_column_value > - 1 :
@@ -82,5 +117,6 @@ def start_single_import(self, target_engine, configuration_name, requested_full_
82117 destination_table_manager .drop_table (pipeline_configuration ['target_schema' ],
83118 pipeline_configuration ['stage_table' ])
84119 data_load_tracker .completed_successfully ()
85- self .logger .info ("Import for configuration: {0} Complete. {1}" .format (configuration_name , data_load_tracker .get_statistics ()))
120+ self .data_load_tracker_repository .save (data_load_tracker )
121+ self .logger .info ("Import Complete for: {0}. {1}" .format (model_name , data_load_tracker .get_statistics ()))
86122
0 commit comments