Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit b9e9eee

Browse files
authored
Merge pull request #46 from PageUpPeopleOrg/feature/normalize-schema
Isolate Integration Test Execution User & Normalize Schema
2 parents 4ec8834 + ecf495a commit b9e9eee

19 files changed

+288
-61
lines changed

README.md

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,36 @@ Use the following vscode settings by either:
180180

181181
#### Integration
182182

183-
The test batch files assume there is a user by the name of `postgres` on the system.
184-
It also sends through a nonsense password -- it is assumed that the target system is running in 'trust' mode.
183+
##### Pre-requisites
185184

186-
_See [Postgres docs](https://www.postgresql.org/docs/9.1/static/auth-pg-hba-conf.html) for details on trust mode._
185+
- PostgreSQL 10+ and SQL2016 _(even SQL Express would do, you'd just have to find and replace connection string references for SQL2016 with SQLEXPRESS)_ are installed
186+
187+
- The test batch files assume there is a user by the name of `postgres` on the system.
188+
It also sends through a nonsense password -- it is assumed that the target system is running in 'trust' mode.
189+
190+
_See [Postgres docs](https://www.postgresql.org/docs/9.1/static/auth-pg-hba-conf.html) for details on trust mode._
191+
192+
193+
##### Setup
194+
195+
Run the below scripts on SQL Server to setup a source database.
196+
197+
```
198+
./tests/integration_tests/mssql_source/source_database_setup/create_database.sql
199+
./tests/integration_tests/mssql_source/source_database_setup/create_large_table.sql
200+
./tests/integration_tests/mssql_source/source_database_setup/create_compound_pk.sql
201+
```
202+
203+
Run the below scripts on PostgreSQL Server to setup a target database.
204+
205+
```
206+
./tests/integration_tests/psql_destination/create-db.sql
207+
./tests/integration_tests/psql_destination/setup-db.sql
208+
```
209+
210+
##### Execution
211+
212+
Run the different `*.cmd` test files at `./tests/integration_tests/` location. `test_full_refresh_from_mssql.cmd` is a good start.
187213

188214
#### Unit
189215

appveyor.yml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ environment:
66
PGPASSWORD: "Password12!"
77
PGPORT: 5432
88
PGHOST: "localhost"
9-
DBNAME: relational_data_loader_integration_tests
9+
DBNAME: rdl_integration_test_target_db
1010
MSSQLSTR: "{'username': '','password': '','server_string': '(local)\\SQL2016'}"
1111
matrix:
1212
- PYTHON: "C:\\Python37-x64" #This needs to be a double slash
@@ -34,17 +34,17 @@ install:
3434
build_script:
3535
#Setup the source MSSQL database
3636
- sqlcmd -b -E -S "(local)\SQL2016" -i .\tests\integration_tests\mssql_source\source_database_setup\create_database.sql
37-
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RelationalDataLoaderIntegrationTestSource -i .\tests\integration_tests\mssql_source\source_database_setup\create_large_table.sql
38-
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RelationalDataLoaderIntegrationTestSource -i .\tests\integration_tests\mssql_source\source_database_setup\create_compound_pk.sql
37+
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RDL_Integration_Test_Source_Db -i .\tests\integration_tests\mssql_source\source_database_setup\create_large_table.sql
38+
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RDL_Integration_Test_Source_Db -i .\tests\integration_tests\mssql_source\source_database_setup\create_compound_pk.sql
3939
#Setup the target PostgreSQL database
4040
- psql -c "SELECT VERSION()"
4141
- createdb %DBNAME%
42-
- psql -d %DBNAME% -U postgres -a -v ON_ERROR_STOP=1 -f .\tests\integration_tests\psql_destination\setup.sql
42+
- psql -d %DBNAME% -U postgres -a -v ON_ERROR_STOP=1 -f .\tests\integration_tests\psql_destination\setup-db.sql
4343
#Activate Virtual Environment
4444
- C:\projects\relational-data-loader\venv\Scripts\activate.bat
4545
#Install the dependencies for rdl.
4646
- pip install .
47-
- alembic -c rdl/alembic.ini -x postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests upgrade head
47+
- alembic -c rdl/alembic.ini -x postgresql+psycopg2://rdl_integration_test_user:rdl_integration_test_password@localhost/rdl_integration_test_target_db upgrade head
4848

4949
test_script:
5050
# unit tests
@@ -58,11 +58,11 @@ test_script:
5858
- .\tests\integration_tests\test_full_refresh_from_mssql.cmd
5959
- .\tests\integration_tests\test_incremental_refresh_from_mssql.cmd
6060

61-
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RelationalDataLoaderIntegrationTestSource -i .\tests\integration_tests\mssql_source\source_database_setup\change_compound_pk.sql
61+
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RDL_Integration_Test_Source_Db -i .\tests\integration_tests\mssql_source\source_database_setup\change_compound_pk.sql
6262

6363
- .\tests\integration_tests\test_incremental_refresh_from_mssql.cmd
6464

65-
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RelationalDataLoaderIntegrationTestSource -i .\tests\integration_tests\mssql_source\source_database_setup\change_large_table.sql
65+
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RDL_Integration_Test_Source_Db -i .\tests\integration_tests\mssql_source\source_database_setup\change_large_table.sql
6666

6767
- .\tests\integration_tests\test_full_refresh_from_mssql.cmd
6868
- .\tests\integration_tests\test_audit.cmd
@@ -73,4 +73,4 @@ on_finish:
7373
#Enable this line to make the build pause after completion for RDP troubleshooting.
7474
#- ps: $blockRdp = $true; iex ((new-object net.webclient).DownloadString('https://raw.githubusercontent.com/appveyor/ci/master/scripts/enable-rdp.ps1'))
7575

76-
- alembic -c rdl/alembic.ini -x postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests downgrade base
76+
- alembic -c rdl/alembic.ini -x postgresql+psycopg2://rdl_integration_test_user:rdl_integration_test_password@localhost/rdl_integration_test_target_db downgrade base
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
"""normalize schema
2+
3+
Revision ID: 00f2b412576b
4+
Revises: 955122a76711
5+
Create Date: 2019-05-15 21:46:42.147590
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
from sqlalchemy.dialects import postgresql
11+
12+
# revision identifiers, used by Alembic.
13+
revision = '00f2b412576b'
14+
down_revision = '955122a76711'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
# mark existing tables with old revision
21+
op.execute('ALTER TABLE rdl.execution RENAME TO execution_955122a76711')
22+
op.execute('ALTER TABLE rdl.execution_model RENAME TO execution_model_955122a76711')
23+
24+
# create new schema tables
25+
op.create_table('execution',
26+
sa.Column('execution_id', postgresql.UUID(as_uuid=True), nullable=False),
27+
sa.Column('created_on', sa.DateTime(timezone=True), server_default=sa.text('now()'),
28+
nullable=False),
29+
sa.Column('updated_on', sa.DateTime(timezone=True), server_default=sa.text('now()'),
30+
nullable=False),
31+
sa.Column('status', sa.String(length=50), server_default='Started', nullable=False),
32+
sa.Column('started_on', sa.DateTime(timezone=True), server_default=sa.text('now()'),
33+
nullable=False),
34+
sa.Column('completed_on', sa.DateTime(timezone=True), nullable=True),
35+
sa.Column('execution_time_s', sa.BigInteger(), nullable=True),
36+
sa.Column('rows_processed', sa.BigInteger(), nullable=True),
37+
sa.Column('models_processed', sa.Integer(), nullable=True),
38+
sa.PrimaryKeyConstraint('execution_id'),
39+
schema='rdl'
40+
)
41+
op.create_table('execution_model',
42+
sa.Column('execution_model_id', postgresql.UUID(as_uuid=True), nullable=False),
43+
sa.Column('created_on', sa.DateTime(timezone=True), server_default=sa.text('now()'),
44+
nullable=False),
45+
sa.Column('updated_on', sa.DateTime(timezone=True), server_default=sa.text('now()'),
46+
nullable=False),
47+
sa.Column('execution_id', postgresql.UUID(as_uuid=True), nullable=False),
48+
sa.Column('model_name', sa.String(length=250), nullable=False),
49+
sa.Column('status', sa.String(length=50), server_default='Started', nullable=False),
50+
sa.Column('last_sync_version', sa.BigInteger(), nullable=False),
51+
sa.Column('sync_version', sa.BigInteger(), nullable=False),
52+
sa.Column('is_full_refresh', sa.Boolean(), nullable=False),
53+
sa.Column('full_refresh_reason', sa.String(length=100), nullable=False),
54+
sa.Column('started_on', sa.DateTime(timezone=True), server_default=sa.text('now()'),
55+
nullable=False),
56+
sa.Column('completed_on', sa.DateTime(timezone=True), nullable=True),
57+
sa.Column('execution_time_ms', sa.BigInteger(), nullable=True),
58+
sa.Column('rows_processed', sa.BigInteger(), nullable=True),
59+
sa.Column('model_checksum', sa.String(length=100), nullable=False),
60+
sa.Column('failure_reason', sa.String(length=1000), nullable=True),
61+
sa.ForeignKeyConstraint(['execution_id'], ['rdl.execution.execution_id'], ),
62+
sa.PrimaryKeyConstraint('execution_model_id'),
63+
schema='rdl'
64+
)
65+
66+
# move data from old tables to new tables
67+
op.execute(
68+
'''
69+
INSERT INTO rdl.execution (
70+
execution_id, created_on, updated_on,
71+
status, started_on, completed_on,
72+
execution_time_s, rows_processed, models_processed
73+
)
74+
SELECT
75+
id, execution_started, COALESCE(execution_ended, execution_started),
76+
status, execution_started, execution_ended,
77+
execution_time_s, total_rows_processed, total_models_processed
78+
FROM rdl.execution_955122a76711
79+
''')
80+
op.execute(
81+
'''
82+
INSERT INTO rdl.execution_model (
83+
execution_model_id, execution_id, created_on, updated_on,
84+
model_name, status, started_on, completed_on, failure_reason,
85+
last_sync_version, sync_version, is_full_refresh, full_refresh_reason,
86+
execution_time_ms, rows_processed, model_checksum
87+
)
88+
SELECT
89+
uuid_generate_v4(), execution_id, started_on, COALESCE(completed_on, started_on),
90+
model_name, status, started_on, completed_on, failure_reason,
91+
last_sync_version, sync_version, is_full_refresh, full_refresh_reason,
92+
execution_time_ms, rows_processed, model_checksum
93+
FROM rdl.execution_model_955122a76711
94+
''')
95+
96+
# drop old tables
97+
op.drop_table('execution_model_955122a76711', schema='rdl')
98+
op.drop_table('execution_955122a76711', schema='rdl')
99+
100+
101+
def downgrade():
102+
# mark existing tables with new revision
103+
op.execute('ALTER TABLE rdl.execution RENAME TO execution_00f2b412576b')
104+
op.execute('ALTER TABLE rdl.execution_model RENAME TO execution_model_00f2b412576b')
105+
106+
# create old revision tables
107+
op.create_table('execution',
108+
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
109+
sa.Column('status', sa.String(length=50), server_default='Started', nullable=False),
110+
sa.Column('execution_started', sa.DateTime(timezone=True), server_default=sa.text('now()'),
111+
nullable=False),
112+
sa.Column('execution_ended', sa.DateTime(timezone=True), nullable=True),
113+
sa.Column('execution_time_s', sa.BigInteger(), nullable=True),
114+
sa.Column('total_rows_processed', sa.BigInteger(), nullable=True),
115+
sa.Column('total_models_processed', sa.Integer(), nullable=True),
116+
sa.PrimaryKeyConstraint('id'),
117+
schema='rdl'
118+
)
119+
op.create_table('execution_model',
120+
sa.Column('execution_id', postgresql.UUID(as_uuid=True), nullable=True),
121+
sa.Column('model_name', sa.String(length=250), nullable=False),
122+
sa.Column('status', sa.String(length=25), nullable=False),
123+
sa.Column('last_sync_version', sa.BigInteger(), nullable=False),
124+
sa.Column('sync_version', sa.BigInteger(), nullable=False),
125+
sa.Column('is_full_refresh', sa.Boolean(), nullable=False),
126+
sa.Column('full_refresh_reason', sa.String(length=100), nullable=False),
127+
sa.Column('started_on', sa.DateTime(timezone=True), server_default=sa.text('now()'),
128+
nullable=False),
129+
sa.Column('completed_on', sa.DateTime(timezone=True), nullable=True),
130+
sa.Column('execution_time_ms', sa.Integer(), nullable=True),
131+
sa.Column('rows_processed', sa.Integer(), nullable=True),
132+
sa.Column('model_checksum', sa.String(length=100), nullable=False),
133+
sa.Column('failure_reason', sa.String(length=1000), nullable=True),
134+
# sa.ForeignKeyConstraint(['execution_id'], ['rdl.execution.id'], ),
135+
# sa.PrimaryKeyConstraint('execution_id', 'model_name'),
136+
schema='rdl'
137+
)
138+
op.create_primary_key("pk_data_load_execution", "execution_model",
139+
["execution_id", "model_name"], schema='rdl')
140+
op.create_foreign_key("data_load_execution_execution_id_fkey", 'execution_model', 'execution', ['execution_id'],
141+
['id'], source_schema='rdl', referent_schema='rdl')
142+
143+
# move data from new revision tables to old revision tables
144+
op.execute(
145+
'''
146+
INSERT INTO rdl.execution (
147+
id,
148+
status, execution_started, execution_ended,
149+
execution_time_s, total_rows_processed, total_models_processed
150+
)
151+
SELECT
152+
execution_id,
153+
status, started_on, completed_on,
154+
execution_time_s, rows_processed, models_processed
155+
FROM rdl.execution_00f2b412576b
156+
''')
157+
op.execute(
158+
'''
159+
INSERT INTO rdl.execution_model (
160+
execution_id,
161+
model_name, status, started_on, completed_on, failure_reason,
162+
last_sync_version, sync_version, is_full_refresh, full_refresh_reason,
163+
execution_time_ms, rows_processed, model_checksum
164+
)
165+
SELECT
166+
execution_id,
167+
model_name, status, started_on, completed_on, failure_reason,
168+
last_sync_version, sync_version, is_full_refresh, full_refresh_reason,
169+
execution_time_ms, rows_processed, model_checksum
170+
FROM rdl.execution_model_00f2b412576b
171+
''')
172+
173+
# drop new revision tables
174+
op.drop_table('execution_model_00f2b412576b', schema='rdl')
175+
op.drop_table('execution_00f2b412576b', schema='rdl')

rdl/alembic/versions/955122a76711_create_an_execution_summary_table.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def upgrade():
6262

6363
op.drop_column('data_load_execution', 'id', schema='rdl')
6464
op.create_primary_key("pk_data_load_execution", "data_load_execution",
65-
["execution_id", "model_name"], schema='rdl')
65+
["execution_id", "model_name"], schema='rdl')
6666

6767
op.execute('ALTER TABLE rdl.data_load_execution RENAME TO execution_model')
6868
# ### end Alembic commands ###
@@ -79,7 +79,7 @@ def downgrade():
7979
'id', sa.INTEGER(),
8080
server_default=sa.text("nextval('rdl.data_load_execution_id_seq'::regclass)"),
8181
autoincrement=True, nullable=False), schema='rdl')
82-
op.create_primary_key("pk_data_load_execution", "data_load_execution", ["id"], schema='rdl')
82+
op.create_primary_key("pk_data_load_execution", "data_load_execution", ["id"], schema='rdl')
8383

8484
op.execute('ALTER SEQUENCE rdl.data_load_execution_id_seq OWNED BY rdl.data_load_execution.id')
8585
op.drop_column('data_load_execution', 'started_on', schema='rdl')

rdl/data_load_tracking/DataLoadTrackerRepository.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def create_execution(self):
2727
session.add(new_execution)
2828
session.commit()
2929
self.logger.info(new_execution)
30-
execution_id = new_execution.id
30+
execution_id = new_execution.execution_id
3131
session.close()
3232
return execution_id
3333

@@ -38,18 +38,18 @@ def complete_execution(self, execution_id, total_number_of_models,
3838
status=Constants.ExecutionStatus.SUCCESSFUL):
3939
session = self.session_maker()
4040
current_execution = session.query(ExecutionEntity) \
41-
.filter(ExecutionEntity.id == execution_id) \
41+
.filter(ExecutionEntity.execution_id == execution_id) \
4242
.one()
4343

4444
execution_end_time = session.query(func.now()).scalar()
45-
total_execution_seconds = (execution_end_time - current_execution.execution_started).total_seconds()
46-
total_rows_processed = self.get_execution_rows(current_execution.id)
45+
total_execution_seconds = (execution_end_time - current_execution.started_on).total_seconds()
46+
total_rows_processed = self.get_execution_rows(current_execution.execution_id)
4747

48-
current_execution.total_models_processed = total_number_of_models
48+
current_execution.models_processed = total_number_of_models
4949
current_execution.status = status
50-
current_execution.execution_ended = execution_end_time
50+
current_execution.completed_on = execution_end_time
5151
current_execution.execution_time_s = total_execution_seconds
52-
current_execution.total_rows_processed = total_rows_processed
52+
current_execution.rows_processed = total_rows_processed
5353
session.commit()
5454
self.logger.info(current_execution)
5555
session.close()

rdl/data_sources/MsSqlDataSource.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,8 @@ def get_change_tracking_info(self, table_config, last_known_sync_version):
222222
result = self.database_engine.execute(text(get_change_tracking_info_sql))
223223
row = result.fetchone()
224224

225-
return ChangeTrackingInfo( row["last_sync_version"], row["sync_version"],
226-
row["force_full_load"], row["data_changed_since_last_sync"])
225+
return ChangeTrackingInfo(row["last_sync_version"], row["sync_version"],
226+
row["force_full_load"], row["data_changed_since_last_sync"])
227227

228228
@staticmethod
229229
def build_where_clause(batch_key_tracker, table_alias):

0 commit comments

Comments
 (0)