Skip to content

Commit 78525b5

Browse files
pg_replication: add a typical example of initial load (#648)
* add a typical example of initial load * fixing lint errors * Example load with no manual source creation * No ambiguous source and destination pipelines --------- Co-authored-by: anuunchin <88698977+anuunchin@users.noreply.github.com>
1 parent b3ef9bf commit 78525b5

File tree

3 files changed

+146
-76
lines changed

3 files changed

+146
-76
lines changed

sources/pg_replication/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Resources that can be loaded using this verified source are:
66
| Name | Description |
77
|----------------------|-------------------------------------------------|
88
| replication_resource | Load published messages from a replication slot |
9+
| init_replication | Initialize replication and optionally return snapshot resources for initial data load |
910

1011
## Initialize the pipeline
1112

@@ -29,6 +30,13 @@ It also needs `CREATE` privilege on the database:
2930
GRANT CREATE ON DATABASE dlt_data TO replication_user;
3031
```
3132

33+
If not a superuser, the user must have ownership of the tables that need to be replicated:
34+
35+
```sql
36+
ALTER TABLE your_table OWNER TO replication_user;
37+
```
38+
39+
3240
### Set up RDS
3341
1. You must enable replication for RDS Postgres instance via **Parameter Group**: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.Replication.ReadReplicas.html
3442
2. `WITH LOGIN REPLICATION;` does not work on RDS, instead do:

sources/pg_replication/helpers.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
from dlt.sources.credentials import ConnectionStringCredentials
4343
from dlt.sources.sql_database import (
4444
sql_table as core_sql_table,
45-
sql_database as core_sql_datbase,
45+
sql_database as core_sql_database,
4646
)
4747

4848
from .schema_types import _to_dlt_column_schema, _to_dlt_val
@@ -114,7 +114,8 @@ def init_replication(
114114
table_names (Optional[Union[str, Sequence[str]]]): Name(s) of the table(s)
115115
to include in the publication. If not provided, the whole schema with `schema_name` will be replicated
116116
(also tables added to the schema after the publication was created). You need superuser privileges
117-
for the schema replication.
117+
for the whole schema replication. When specifying individual table names, the database role must
118+
own the tables if it's not a superuser.
118119
credentials (ConnectionStringCredentials): Postgres database credentials.
119120
publish (str): Comma-separated string of DML operations. Can be used to
120121
control which changes are included in the publication. Allowed operations
@@ -184,7 +185,7 @@ def init_replication(
184185
# do not include dlt tables
185186
table_names = [
186187
table_name
187-
for table_name in core_sql_datbase(
188+
for table_name in core_sql_database(
188189
credentials, schema=schema_name, reflection_level="minimal"
189190
).resources.keys()
190191
if not table_name.lower().startswith(DLT_NAME_PREFIX)

0 commit comments

Comments
 (0)