Skip to content

Commit 11fe94e

Browse files
committed
working dbt DAG in original airflow stack
1 parent 7485d4e commit 11fe94e

28 files changed

+2118
-57
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
---
2+
apiVersion: batch/v1
3+
kind: Job
4+
metadata:
5+
name: create-tables-in-trino
6+
spec:
7+
template:
8+
spec:
9+
serviceAccountName: demo-serviceaccount
10+
containers:
11+
- name: create-tables-in-trino
12+
image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev
13+
command: ["bash", "-c", "python -u /tmp/script/script.py"]
14+
volumeMounts:
15+
- name: script
16+
mountPath: /tmp/script
17+
- name: trino-users
18+
mountPath: /trino-users
19+
volumes:
20+
- name: script
21+
configMap:
22+
name: create-tables-in-trino-script
23+
- name: trino-users
24+
secret:
25+
secretName: trino-users
26+
restartPolicy: OnFailure
27+
backoffLimit: 50
28+
---
29+
apiVersion: v1
30+
kind: ConfigMap
31+
metadata:
32+
name: create-tables-in-trino-script
33+
data:
34+
script.py: |
35+
import sys
36+
import trino
37+
38+
if not sys.warnoptions:
39+
import warnings
40+
warnings.simplefilter("ignore")
41+
42+
def get_connection():
43+
connection = trino.dbapi.connect(
44+
host="trino-coordinator",
45+
port=8443,
46+
user="admin",
47+
http_scheme='https',
48+
auth=trino.auth.BasicAuthentication("admin", open("/trino-users/admin").read()),
49+
)
50+
connection._http_session.verify = False
51+
return connection
52+
53+
def run_query(connection, query):
54+
print(f"[DEBUG] Executing query {query}")
55+
cursor = connection.cursor()
56+
cursor.execute(query)
57+
return cursor.fetchall()
58+
59+
connection = get_connection()
60+
61+
run_query(connection, "CREATE SCHEMA iceberg.dbt_schema WITH (location = 's3a://demo/dbt_schema')")
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
---
2+
apiVersion: v1
3+
kind: ServiceAccount
4+
metadata:
5+
name: demo-serviceaccount
6+
namespace: default
7+
---
8+
apiVersion: rbac.authorization.k8s.io/v1
9+
kind: ClusterRoleBinding
10+
metadata:
11+
name: demo-clusterrolebinding
12+
subjects:
13+
- kind: ServiceAccount
14+
name: demo-serviceaccount
15+
namespace: default
16+
roleRef:
17+
kind: ClusterRole
18+
name: demo-clusterrole
19+
apiGroup: rbac.authorization.k8s.io
20+
---
21+
apiVersion: rbac.authorization.k8s.io/v1
22+
kind: ClusterRole
23+
metadata:
24+
name: demo-clusterrole
25+
rules:
26+
- apiGroups:
27+
- ""
28+
resources:
29+
- pods
30+
verbs:
31+
- get
32+
- list
33+
- watch
34+
- apiGroups:
35+
- apps
36+
resources:
37+
- statefulsets
38+
verbs:
39+
- get
40+
- list
41+
- watch
42+
- apiGroups:
43+
- batch
44+
resources:
45+
- jobs
46+
verbs:
47+
- get
48+
- list
49+
- watch
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
---
2+
apiVersion: batch/v1
3+
kind: Job
4+
metadata:
5+
name: create-tables-in-trino
6+
spec:
7+
template:
8+
spec:
9+
serviceAccountName: demo-serviceaccount
10+
containers:
11+
- name: create-tables-in-trino
12+
image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev
13+
command: ["bash", "-c", "python -u /tmp/script/script.py"]
14+
volumeMounts:
15+
- name: script
16+
mountPath: /tmp/script
17+
- name: trino-users
18+
mountPath: /trino-users
19+
volumes:
20+
- name: script
21+
configMap:
22+
name: create-tables-in-trino-script
23+
- name: trino-users
24+
secret:
25+
secretName: trino-users
26+
restartPolicy: OnFailure
27+
backoffLimit: 50
28+
---
29+
apiVersion: v1
30+
kind: ConfigMap
31+
metadata:
32+
name: create-tables-in-trino-script
33+
data:
34+
script.py: |
35+
import sys
36+
import trino
37+
38+
if not sys.warnoptions:
39+
import warnings
40+
warnings.simplefilter("ignore")
41+
42+
def get_connection():
43+
connection = trino.dbapi.connect(
44+
host="trino-coordinator",
45+
port=8443,
46+
user="admin",
47+
http_scheme='https',
48+
auth=trino.auth.BasicAuthentication("admin", open("/trino-users/admin").read()),
49+
)
50+
connection._http_session.verify = False
51+
return connection
52+
53+
def run_query(connection, query):
54+
print(f"[DEBUG] Executing query {query}")
55+
cursor = connection.cursor()
56+
cursor.execute(query)
57+
return cursor.fetchall()
58+
59+
connection = get_connection()
60+
61+
run_query(connection, "CREATE SCHEMA iceberg.dbt_schema WITH (location = 's3a://demo/dbt_schema')")
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
---
2+
apiVersion: v1
3+
kind: ServiceAccount
4+
metadata:
5+
name: demo-serviceaccount
6+
namespace: default
7+
---
8+
apiVersion: rbac.authorization.k8s.io/v1
9+
kind: ClusterRoleBinding
10+
metadata:
11+
name: demo-clusterrolebinding
12+
subjects:
13+
- kind: ServiceAccount
14+
name: demo-serviceaccount
15+
namespace: default
16+
roleRef:
17+
kind: ClusterRole
18+
name: demo-clusterrole
19+
apiGroup: rbac.authorization.k8s.io
20+
---
21+
apiVersion: rbac.authorization.k8s.io/v1
22+
kind: ClusterRole
23+
metadata:
24+
name: demo-clusterrole
25+
rules:
26+
- apiGroups:
27+
- ""
28+
resources:
29+
- pods
30+
verbs:
31+
- get
32+
- list
33+
- watch
34+
- apiGroups:
35+
- apps
36+
resources:
37+
- statefulsets
38+
verbs:
39+
- get
40+
- list
41+
- watch
42+
- apiGroups:
43+
- batch
44+
resources:
45+
- jobs
46+
verbs:
47+
- get
48+
- list
49+
- watch

demos/demos-v2.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ demos:
5252
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml
5353
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/05-enable-and-run-kafka-dag.yaml
5454
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/06-create-opa-users.yaml
55+
- plainYaml: demos/airflow-trino-dbt/serviceaccount.yaml
56+
- plainYaml: demos/airflow-trino-dbt/create-trino-tables.yaml
5557
supportedNamespaces: []
5658
resourceRequests:
5759
cpu: 2401m
@@ -284,3 +286,17 @@ demos:
284286
cpu: 6400m
285287
memory: 12622Mi
286288
pvc: 20Gi
289+
airflow-trino-dbt:
290+
description: Airflow DAG calling DBT jobs to write data to Trino table via iceberg
291+
documentation: https://docs.stackable.tech/home/stable/demos/airflow-trino-dbt
292+
stackableStack: airflow-trino-dbt
293+
labels:
294+
- iceberg
295+
- trino
296+
- minio
297+
- s3
298+
- airflow
299+
manifests:
300+
- plainYaml: demos/airflow-trino-dbt/serviceaccount.yaml
301+
- plainYaml: demos/airflow-trino-dbt/create-trino-tables.yaml
302+
supportedNamespaces: ["default"]

stacks/airflow-trino-dbt/airflow.yaml

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
11
---
2+
apiVersion: secrets.stackable.tech/v1alpha1
3+
kind: TrustStore
4+
metadata:
5+
name: truststore-pem
6+
spec:
7+
secretClassName: tls
8+
format: tls-pem
9+
targetKind: ConfigMap
10+
---
211
apiVersion: airflow.stackable.tech/v1alpha1
312
kind: AirflowCluster
413
metadata:
@@ -21,10 +30,15 @@ spec:
2130
- name: airflow-dags
2231
configMap:
2332
name: airflow-dags
33+
- name: s3-tls-pem
34+
configMap:
35+
name: truststore-pem
2436
volumeMounts:
2537
- name: airflow-dags
2638
mountPath: /stackable/airflow/dags/dbt.py
2739
subPath: dbt.py
40+
- name: s3-tls-pem
41+
mountPath: /stackable/s3-tls-pem
2842
webservers:
2943
roleConfig:
3044
listenerClass: external-stable
@@ -35,31 +49,31 @@ spec:
3549
max: "3"
3650
memory:
3751
limit: 3Gi
52+
envOverrides: &envOverrides
53+
AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
54+
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "s3_conn"
55+
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "s3://airflow/logs"
56+
AIRFLOW__LOGGING__ENCRYPT_S3_LOGS: "FALSE"
57+
AWS_ACCESS_KEY_ID: admin
58+
AWS_SECRET_ACCESS_KEY: adminadmin # {{ airflowAdminPassword }}
59+
AIRFLOW_CONN_S3_CONN: "{\"conn_type\": \"aws\", \"extra\": {\"endpoint_url\": \"https://minio.default.svc.cluster.local:9000\", \"verify\": \"/stackable/s3-tls-pem/ca.crt\"}}"
3860
roleGroups:
3961
default:
4062
replicas: 1
41-
celeryExecutors:
42-
config:
43-
resources:
44-
cpu:
45-
min: "2"
46-
max: "3"
47-
memory:
48-
limit: 4Gi
49-
roleGroups:
50-
default:
51-
replicas: 1
52-
# kubernetesExecutors:
53-
# config: {}
63+
kubernetesExecutors:
64+
envOverrides: *envOverrides
5465
schedulers:
66+
envOverrides: *envOverrides
5567
roleGroups:
5668
default:
5769
replicas: 1
5870
dagProcessors:
71+
envOverrides: *envOverrides
5972
roleGroups:
6073
default:
6174
replicas: 1
6275
triggerers:
76+
envOverrides: *envOverrides
6377
roleGroups:
6478
default:
6579
replicas: 1

stacks/airflow-trino-dbt/dbt/dbt_test/dbt_project.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
---
12
name: 'dbt_demo'
23
version: '1.0.0'
34
config-version: 2

stacks/airflow-trino-dbt/dbt/dbt_test/models/schema.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
---
12
version: 2
23

34
models:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
---
12
packages: []

stacks/airflow-trino-dbt/dbt/dbt_test/profiles.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
---
12
trino_demo:
23
outputs:
34
iceberg:
@@ -8,7 +9,7 @@ trino_demo:
89
catalog: iceberg
910
host: "{{ env_var('TRINO_HOST') }}"
1011
port: "{{ env_var('TRINO_PORT') | int }}"
11-
schema: dbt_schema3
12+
schema: dbt_schema
1213
threads: 1
1314
cert: "{{ env_var('CERT_PATH') }}"
1415
verify: true

0 commit comments

Comments
 (0)