Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit ccc6e99

Browse files
committed
Fixed issue with change tracking order
1 parent 9e8e5e5 commit ccc6e99

File tree

7 files changed

+13
-16
lines changed

7 files changed

+13
-16
lines changed

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"python.pythonPath": "${workspaceFolder}\\venv\\Scripts\\python.exe"
3+
}

modules/DataLoadManager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ def start_single_import(self, target_engine, model_name, requested_full_refresh)
5050

5151
data_load_tracker = DataLoadTracker(model_name, json_data, full_refresh, change_tracking_info, self.correlation_id)
5252

53-
5453
self.logger.debug(" Change Tracking: this_sync_version: {0} next_sync_version: {1} force_full_load:{2} : ".format(change_tracking_info.this_sync_version, change_tracking_info.next_sync_version, change_tracking_info.force_full_load()))
5554
if not full_refresh and change_tracking_info.force_full_load():
5655
self.logger.info("Change tracking has forced this to be a full load")

modules/DestinationTableManager.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ def create_schema(self, schema_name):
2424
def table_exists(self, schema_name, table_name):
2525
return self.target_engine.dialect.has_table(self.target_engine, table_name, schema_name)
2626

27-
2827
def drop_table(self, schema_name, table_name):
2928
metadata = MetaData()
3029
self.logger.debug(
@@ -36,9 +35,6 @@ def drop_table(self, schema_name, table_name):
3635
self.logger.debug(
3736
"Dropped table {0}.{1}".format(schema_name, table_name))
3837

39-
40-
41-
4238
def create_table(self, schema_name, table_name, columns_configuration, drop_first):
4339
metadata = MetaData()
4440

@@ -63,7 +59,6 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs
6359
self.logger.debug(
6460
"Dropped table {0}.{1}".format(schema_name, table_name))
6561

66-
6762
self.logger.debug("Creating table {0}.{1}".format(schema_name, table_name))
6863
table.create(self.target_engine, checkfirst=False)
6964
self.logger.debug("Created table {0}.{1}".format(schema_name, table_name))
@@ -75,7 +70,6 @@ def create_column(self, configuration):
7570
primary_key=configuration.get("primary_key", False),
7671
nullable=configuration['nullable'])
7772

78-
7973
def rename_table(self, schema_name, source_table_name, target_table_name):
8074

8175
# Steps to efficiently rename a table.
@@ -122,8 +116,9 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column
122116
column_list = column_list + ",{0}".format(self.IS_DELETED_COLUMN_NAME)
123117
column_list = column_list + ",{0}".format(self.CHANGE_VERSION_COLUMN_NAME)
124118

125-
126-
primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in columns_configuration if 'primary_key' in column_configuration['destination'] and column_configuration['destination']['primary_key']]
119+
primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in
120+
columns_configuration if 'primary_key' in column_configuration['destination'] and
121+
column_configuration['destination']['primary_key']]
127122

128123
primary_key_column_list = ','.join(map(str, primary_key_column_array))
129124

@@ -134,15 +129,15 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column
134129
sql_builder.write(os.linesep)
135130
sql_builder.write(" ON CONFLICT({0}) DO UPDATE SET ".format(primary_key_column_list))
136131

137-
for column_configuratiomn in columns_configuration:
138-
sql_builder.write("{0} = EXCLUDED.{0},".format(column_configuratiomn['destination']['name']))
132+
for column_configuration in columns_configuration:
133+
sql_builder.write("{0} = EXCLUDED.{0},".format(column_configuration['destination']['name']))
139134
sql_builder.write(os.linesep)
140135

141136
sql_builder.write("{0} = EXCLUDED.{0},".format(self.TIMESTAMP_COLUMN_NAME))
142137
sql_builder.write(os.linesep)
143138
sql_builder.write("{0} = EXCLUDED.{0},".format(self.IS_DELETED_COLUMN_NAME))
144139
sql_builder.write(os.linesep)
145-
sql_builder.write("{0} = EXCLUDED.{0},".format(self.CHANGE_VERSION_COLUMN_NAME))
140+
sql_builder.write("{0} = EXCLUDED.{0}".format(self.CHANGE_VERSION_COLUMN_NAME))
146141
sql_builder.write(os.linesep)
147142

148143
self.logger.debug("Upsert executing {0}".format(sql_builder.getvalue()))

modules/data_load_tracking/DataLoadTracker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def start_batch(self):
3232
def completed_successfully(self):
3333
self.completed = datetime.now()
3434
self.total_execution_time = self.completed - self.started
35-
35+
self.status = "Completed Successfully"
3636
for batch in self.batches:
3737
self.total_row_count = self.total_row_count + batch.row_count
3838

@@ -91,7 +91,6 @@ def load_completed_successfully(self):
9191
self.load_rows_per_second = self.row_count / self.load_execution_time.total_seconds()
9292

9393

94-
# TODO: remove
9594
def load_skipped_due_to_zero_rows(self):
9695
self.status = "Skipped - Zero Rows"
9796
self.load_completed = datetime.now()

modules/data_load_tracking/DataLoadTrackerRepository.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from sqlalchemy import desc
23
from modules.data_load_tracking.DataLoadExecution import DataLoadExecution, Base
34

45

@@ -13,7 +14,7 @@ def create_tables(self, engine):
1314

1415
def get_last_sync_version(self, model_name):
1516
session = self.session_maker()
16-
result = session.query(DataLoadExecution).filter_by(model_name=model_name, status="Load Completed Successfully").order_by(DataLoadExecution.completed_on).first()
17+
result = session.query(DataLoadExecution).filter_by(model_name=model_name, status="Completed Successfully").order_by(desc(DataLoadExecution.completed_on)).first()
1718

1819
if result is None:
1920
return 0

modules/data_sources/ChangeTrackingInfo.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ def __init__(self, this_sync_version, next_sync_version):
88
self.next_sync_version = next_sync_version
99

1010
def force_full_load(self):
11-
return bool(self.next_sync_version == 0)
11+
return bool(self.this_sync_version == 0 or self.next_sync_version == 0)
1212

1313

File renamed without changes.

0 commit comments

Comments
 (0)