Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit b69b7a8

Browse files
authored
Merge pull request #53 from pageuppeople-opensource/feature/OSC-1458-reduce-api-calls-to-datasource
OSC-1458 - Reduce api calls to FooDataSource
2 parents c65d3a3 + 363b554 commit b69b7a8

File tree

4 files changed

+40
-28
lines changed

4 files changed

+40
-28
lines changed

rdl/DataLoadManager.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,17 +85,29 @@ def start_single_import(self, model_file, requested_full_refresh, model_number,
8585
self.logger.error(f"Failed to read model file '{model_file_full_path}' with error: '{str(exception)}'")
8686
raise exception
8787

88-
self.source_db.assert_data_source_is_valid(model_config['source_table'], model_config['columns'])
89-
9088
last_sync_version = 0
9189
last_successful_data_load_execution = \
9290
self.data_load_tracker_repository.get_last_successful_data_load_execution(model_name)
9391

9492
if last_successful_data_load_execution is not None:
9593
last_sync_version = last_successful_data_load_execution.sync_version
9694

95+
source_table_info = self.source_db.get_table_info(model_config['source_table'], last_sync_version)
96+
config_source_column_names = list(map(lambda col_config: col_config['source_name'], model_config['columns']))
97+
invalid_config_source_column_names = []
98+
99+
for col_name in config_source_column_names:
100+
if col_name not in source_table_info.column_names:
101+
invalid_config_source_column_names.append(col_name)
102+
103+
if invalid_config_source_column_names:
104+
message = "Column(s) {column_names} not found in source table {source_table_name}".format(
105+
column_names=", ".join(invalid_config_source_column_names),
106+
source_table_name=model_config['source_table']['name'])
107+
raise ValueError(message)
108+
97109
destination_table_manager = DestinationTableManager(self.target_db)
98-
change_tracking_info = self.source_db.get_change_tracking_info(model_config['source_table'], last_sync_version)
110+
change_tracking_info = source_table_info.change_tracking_info
99111

100112
last_successful_execution_exists = last_successful_data_load_execution is not None
101113
model_changed = (not last_successful_execution_exists) or \

rdl/data_sources/MsSqlDataSource.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from rdl.ColumnTypeResolver import ColumnTypeResolver
1414
from rdl.data_sources.ChangeTrackingInfo import ChangeTrackingInfo
15+
from rdl.data_sources.SourceTableInfo import SourceTableInfo
1516
from rdl.shared import Providers
1617
from rdl.shared.Utils import prevent_senstive_data_logging
1718

@@ -111,21 +112,11 @@ def __build_select_statement(self, table_config, columns, batch_config, batch_ke
111112

112113
return f"{select_sql} \n {from_sql} \n {where_sql} \n {order_by_sql};"
113114

114-
# Returns an array of configured_columns containing only columns that this data source supports. Logs invalid ones.
115-
def assert_data_source_is_valid(self, table_config, configured_columns):
115+
def get_table_info(self, table_config, last_known_sync_version):
116116
columns_in_database = self.__get_table_columns(table_config)
117-
118-
for column in configured_columns:
119-
self.__assert_column_exists(column['source_name'],
120-
columns_in_database,
121-
f"{table_config['schema']}.{table_config['name']}")
122-
123-
def __assert_column_exists(self, column_name, columns_in_database, table_name):
124-
if column_name in columns_in_database:
125-
return True
126-
127-
message = f'Column {column_name} does not exist in source table {table_name}'
128-
raise ValueError(message)
117+
change_tracking_info = self.__get_change_tracking_info(table_config, last_known_sync_version)
118+
source_table_info = SourceTableInfo(columns_in_database, change_tracking_info)
119+
return source_table_info
129120

130121
def __get_table_columns(self, table_config):
131122
metadata = MetaData()
@@ -149,7 +140,7 @@ def get_next_data_frame(self, table_config, columns, batch_config, batch_tracker
149140

150141
return data_frame
151142

152-
def get_change_tracking_info(self, table_config, last_known_sync_version):
143+
def __get_change_tracking_info(self, table_config, last_known_sync_version):
153144

154145
if last_known_sync_version is None:
155146
last_known_sync_version = 'NULL'
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from rdl.data_sources.ChangeTrackingInfo import ChangeTrackingInfo
2+
3+
4+
class SourceTableInfo:
5+
def __init__(self, column_names: [], change_tracking_info: ChangeTrackingInfo):
6+
self.column_names = column_names
7+
self.change_tracking_info = change_tracking_info

tests/unit_tests/test_MsSqlDataSource.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,20 @@ def tearDownClass(cls):
5959
TestMsSqlDataSource.data_source.database_engine.execute(text(TEAR_DOWN_STRING))
6060
TestMsSqlDataSource.data_source = None
6161

62-
def test_get_change_tracking_info(self):
62+
def test_get_table_info_change_tracking_info(self):
6363

6464
last_sync_version = None
6565
for table in TestMsSqlDataSource.table_configs:
6666
print("TESTING ON TABLE: " + table["source_table"]["name"])
6767
print("FIRST TEST: INITIALISE TABLE")
68-
results = TestMsSqlDataSource.data_source.get_change_tracking_info(
69-
table["source_table"], last_sync_version)
68+
results = TestMsSqlDataSource.data_source.get_table_info(
69+
table["source_table"], last_sync_version).change_tracking_info
7070
self.assertEqual(results.force_full_load, True)
7171

7272
print("SECOND TEST: NO CHANGES")
7373
last_sync_version = results.sync_version
74-
results = TestMsSqlDataSource.data_source.get_change_tracking_info(
75-
table["source_table"], last_sync_version)
74+
results = TestMsSqlDataSource.data_source.get_table_info(
75+
table["source_table"], last_sync_version).change_tracking_info
7676
self.assertEqual(results.force_full_load, False)
7777

7878
print("OPERATION TESTS")
@@ -82,20 +82,22 @@ def test_get_change_tracking_info(self):
8282
TestMsSqlDataSource.data_source.database_engine.execute(
8383
text(operation_string).execution_options(autocommit=True))
8484

85-
results = TestMsSqlDataSource.data_source.get_change_tracking_info(
86-
table["source_table"], last_sync_version)
85+
results = TestMsSqlDataSource.data_source.get_table_info(
86+
table["source_table"], last_sync_version).change_tracking_info
8787
self.assertEqual(results.force_full_load, False, msg="Failed on: " + operation_string)
8888
last_sync_version = results.sync_version
8989

9090
print("EXTRA TEST: NO CHANGES")
9191
last_sync_version = results.sync_version
92-
results = TestMsSqlDataSource.data_source.get_change_tracking_info(
93-
table["source_table"], last_sync_version)
92+
results = TestMsSqlDataSource.data_source.get_table_info(
93+
table["source_table"], last_sync_version).change_tracking_info
9494
self.assertEqual(results.force_full_load, False)
9595

9696
print("EXTRA TEST: LOST TRACK")
9797
last_sync_version = -1
98-
results = TestMsSqlDataSource.data_source.get_change_tracking_info(table["source_table"], last_sync_version)
98+
results = TestMsSqlDataSource.data_source\
99+
.get_table_info(table["source_table"], last_sync_version)\
100+
.change_tracking_info
99101
self.assertEqual(results.force_full_load, True)
100102

101103
def test_can_handle_connection_string(self):

0 commit comments

Comments
 (0)