Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 5fc95b6

Browse files
committed
Do a full refresh on model change
1 parent ce09d68 commit 5fc95b6

File tree

6 files changed

+38
-19
lines changed

6 files changed

+38
-19
lines changed

integration_tests/mssql_source/config/LargeTableTest.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
{
2-
32
"source_table": {
43
"name": "LargeTable",
54
"schema": "dbo",

integration_tests/mssql_source/source_database_setup/create_database.sql

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,4 @@ IF NOT EXISTS (SELECT * FROM sys.databases WHERE Name = 'RelationalDataLoaderInt
22
CREATE DATABASE RelationalDataLoaderIntegrationTestSource
33

44
ALTER DATABASE RelationalDataLoaderIntegrationTestSource
5-
SET CHANGE_TRACKING = ON
6-
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
5+
SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)

modules/DataLoadManager.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import json
33
import uuid
44
import logging
5+
import hashlib
6+
57
from modules.BatchDataLoader import BatchDataLoader
68
from modules.DestinationTableManager import DestinationTableManager
79
from modules.data_load_tracking.DataLoadTracker import DataLoadTracker
@@ -25,35 +27,54 @@ def start_single_import(self, target_engine, model_name, requested_full_refresh)
2527

2628
config_file = os.path.abspath(self.configuration_path + model_name)
2729
self.logger.debug("Using configuration file : {0}".format(config_file))
28-
with open(config_file) as json_data:
29-
pipeline_configuration = json.load(json_data)
30+
with open(config_file) as json_file:
31+
model_checksum = hashlib.md5(json_file.read().encode('utf-8')).hexdigest()
32+
json_file.seek(0)
33+
pipeline_configuration = json.load(json_file)
3034

3135
self.logger.info("Execute Starting for: {0} requested_full_refresh: {1}".format(model_name, requested_full_refresh))
3236

3337
destination_table_manager = DestinationTableManager(target_engine)
3438

39+
full_refresh_reason = "Command Line Argument" if requested_full_refresh else "N/A"
3540
full_refresh = requested_full_refresh
3641
if not requested_full_refresh and not destination_table_manager.table_exists(pipeline_configuration['target_schema'],
3742
pipeline_configuration['load_table']):
3843
self.logger.warning("The load table {0}.{1} does not exist. Swapping to full-refresh mode".format(pipeline_configuration['target_schema'],
3944
pipeline_configuration['load_table']))
45+
46+
full_refresh_reason = "Destination table does not exist"
4047
full_refresh = True
4148

4249
self.data_source.assert_data_source_is_valid(pipeline_configuration['source_table'],
4350
pipeline_configuration['columns'])
4451

45-
last_sync_version = self.data_load_tracker_repository.get_last_sync_version(model_name)
52+
last_successful_data_load_execution = self.data_load_tracker_repository.get_last_successful_data_load_execution(model_name)
53+
54+
if last_successful_data_load_execution is None:
55+
last_sync_version = 0
56+
full_refresh_reason = "First Execution"
57+
full_refresh = True,
58+
else:
59+
self.logger.debug("Previous Checksum {0}. Current Checksum {1}".format(last_successful_data_load_execution.model_checksum, model_checksum))
60+
last_sync_version = last_successful_data_load_execution.next_sync_version
61+
if not full_refresh and last_successful_data_load_execution.model_checksum != model_checksum:
62+
self.logger.info("A model checksum change has forced this to be a full load")
63+
full_refresh = True
64+
full_refresh_reason = "Model Change"
4665

4766
change_tracking_info = self.data_source.init_change_tracking(pipeline_configuration['source_table'],
4867
last_sync_version)
4968

5069

51-
data_load_tracker = DataLoadTracker(model_name, json_data, full_refresh, change_tracking_info, self.correlation_id)
5270

53-
self.logger.debug(" Change Tracking: this_sync_version: {0} next_sync_version: {1} force_full_load:{2} : ".format(change_tracking_info.this_sync_version, change_tracking_info.next_sync_version, change_tracking_info.force_full_load))
5471
if not full_refresh and change_tracking_info.force_full_load:
5572
self.logger.info("Change tracking has forced this to be a full load")
5673
full_refresh = True
74+
full_refresh_reason = "Change Tracking Invalid"
75+
76+
data_load_tracker = DataLoadTracker(model_name, model_checksum, json_file, full_refresh, change_tracking_info,
77+
self.correlation_id, full_refresh_reason)
5778

5879
columns = pipeline_configuration['columns']
5980
destination_table_manager.create_schema(pipeline_configuration['target_schema'])

modules/data_load_tracking/DataLoadExecution.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ class DataLoadExecution(Base):
1717
rows_processed = Column(Integer, nullable=False)
1818
correlation_id = Column(String(250), nullable=True)
1919
status = Column(String(25), nullable=False)
20-
20+
model_checksum = Column(String(100), nullable=False)
21+
full_refresh_reason = Column(String(100), nullable=False)

modules/data_load_tracking/DataLoadTracker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@ class DataLoadTracker:
1414
total_row_count = 0
1515
rows_per_second = 0
1616
correlation_id = None,
17+
full_refresh_reason = "N/A"
1718

18-
def __init__(self, model_name, configuration, is_full_refresh, change_tracking_info, correlation_id):
19+
def __init__(self, model_name, model_checksum, configuration, is_full_refresh, change_tracking_info, correlation_id, full_refresh_reason):
1920
self.model_name = model_name
21+
self.model_checksum = model_checksum
2022
self.configuration = configuration
2123
self.is_full_refresh = is_full_refresh
2224
self.started = datetime.now()
2325
self.status = "Not Started"
2426
self.change_tracking_info = change_tracking_info
2527
self.correlation_id = correlation_id
28+
self.full_refresh_reason = full_refresh_reason
2629

2730
def start_batch(self):
2831
batch = self.Batch()

modules/data_load_tracking/DataLoadTrackerRepository.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,9 @@ def create_tables(self, engine):
1212
engine.execute("CREATE SCHEMA IF NOT EXISTS {0}".format("data_pipeline"))
1313
Base.metadata.create_all(engine)
1414

15-
def get_last_sync_version(self, model_name):
15+
def get_last_successful_data_load_execution(self, model_name):
1616
session = self.session_maker()
17-
result = session.query(DataLoadExecution).filter_by(model_name=model_name, status="Completed Successfully").order_by(desc(DataLoadExecution.completed_on)).first()
18-
19-
if result is None:
20-
return 0
21-
return result.next_sync_version
17+
return session.query(DataLoadExecution).filter_by(model_name=model_name, status="Completed Successfully").order_by(desc(DataLoadExecution.completed_on)).first()
2218

2319

2420
def save(self, data_load_tracker):
@@ -30,9 +26,9 @@ def save(self, data_load_tracker):
3026
next_sync_version=data_load_tracker.change_tracking_info.next_sync_version,
3127
execution_time_ms=int(data_load_tracker.total_execution_time.total_seconds() * 1000),
3228
rows_processed=data_load_tracker.total_row_count,
33-
status=data_load_tracker.status)
34-
35-
29+
status=data_load_tracker.status,
30+
model_checksum=data_load_tracker.model_checksum,
31+
full_refresh_reason = data_load_tracker.full_refresh_reason)
3632

3733
session = self.session_maker()
3834
session.add(data_load_execution)

0 commit comments

Comments
 (0)