Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions demos/airflow-scheduled-job/create-trino-tables.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
---
apiVersion: batch/v1
kind: Job
metadata:
name: create-tables-in-trino
spec:
template:
spec:
serviceAccountName: demo-serviceaccount
containers:
- name: create-tables-in-trino
image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev
command: ["bash", "-c", "python -u /tmp/script/script.py"]
volumeMounts:
- name: script
mountPath: /tmp/script
- name: trino-users
mountPath: /trino-users
volumes:
- name: script
configMap:
name: create-tables-in-trino-script
- name: trino-users
secret:
secretName: trino-users
restartPolicy: OnFailure
backoffLimit: 50
---
apiVersion: v1
kind: ConfigMap
metadata:
name: create-tables-in-trino-script
data:
script.py: |
import sys
import trino

if not sys.warnoptions:
import warnings
warnings.simplefilter("ignore")

def get_connection():
connection = trino.dbapi.connect(
host="trino-coordinator",
port=8443,
user="admin",
http_scheme='https',
auth=trino.auth.BasicAuthentication("admin", open("/trino-users/admin").read()),
)
connection._http_session.verify = False
return connection

def run_query(connection, query):
print(f"[DEBUG] Executing query {query}")
cursor = connection.cursor()
cursor.execute(query)
return cursor.fetchall()

connection = get_connection()

run_query(connection, "CREATE SCHEMA iceberg.dbt_schema WITH (location = 's3a://demo/dbt_schema')")
4 changes: 2 additions & 2 deletions demos/airflow-scheduled-job/dbt/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# Install Python packages
COPY requirements.txt .
COPY demos/airflow-scheduled-job/dbt/requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt

Expand All @@ -30,7 +30,7 @@ ENV PATH="/opt/venv/bin:$PATH"

WORKDIR /dbt

COPY dbt_test ./dbt_test
COPY demos/airflow-scheduled-job/dbt/dbt_test ./dbt_test

# Security: non-root user
RUN useradd -m -u 1000 dbt && chown -R dbt:dbt /dbt
Expand Down
49 changes: 49 additions & 0 deletions demos/airflow-scheduled-job/serviceaccount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: demo-serviceaccount
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: demo-clusterrolebinding
subjects:
- kind: ServiceAccount
name: demo-serviceaccount
namespace: default
roleRef:
kind: ClusterRole
name: demo-clusterrole
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: demo-clusterrole
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- get
- list
- watch
- apiGroups:
- batch
resources:
- jobs
verbs:
- get
- list
- watch
2 changes: 2 additions & 0 deletions demos/demos-v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ demos:
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/05-enable-and-run-kafka-dag.yaml
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/06-create-opa-users.yaml
- plainYaml: demos/airflow-scheduled-job/serviceaccount.yaml
- plainYaml: demos/airflow-scheduled-job/create-trino-tables.yaml
supportedNamespaces: []
resourceRequests:
cpu: 2401m
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/modules/demos/images/airflow-scheduled-job/airflow_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/modules/demos/images/airflow-scheduled-job/airflow_8.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/modules/demos/images/airflow-scheduled-job/overview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
97 changes: 75 additions & 22 deletions docs/modules/demos/pages/airflow-scheduled-job.adoc

Large diffs are not rendered by default.

125 changes: 112 additions & 13 deletions stacks/airflow/airflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ spec:
clusterConfig:
authorization:
opa:
configMapName: opa-airflow
configMapName: opa
package: airflow
cache:
entryTimeToLive: 5s
Expand All @@ -22,7 +22,7 @@ spec:
- name: airflow-dags
configMap:
name: airflow-dags
- name: kafka-tls-pem
- name: tls-pem
configMap:
name: truststore-pem
volumeMounts:
Expand All @@ -41,8 +41,11 @@ spec:
- name: airflow-dags
mountPath: /dags/triggerer.py
subPath: triggerer.py
- name: kafka-tls-pem
mountPath: /stackable/kafka-tls-pem
- name: airflow-dags
mountPath: /dags/dbt.py
subPath: dbt.py
- name: tls-pem
mountPath: /stackable/tls-pem
webservers:
roleConfig:
listenerClass: external-stable
Expand All @@ -59,6 +62,10 @@ spec:
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
# Airflow 3: Disable decision caching for easy debugging
AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE: "0"
AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "s3_conn"
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "s3://airflow/logs"
AIRFLOW__LOGGING__ENCRYPT_S3_LOGS: "FALSE"
configOverrides:
webserver_config.py:
# Allow "POST /login/" without CSRF token
Expand All @@ -70,19 +77,41 @@ spec:
image: oci.stackable.tech/sdp/airflow:3.0.6-stackable0.0.0-dev
imagePullPolicy: IfNotPresent
env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: AIRFLOW_CONN_KAFKA_CONN
value: "{\"conn_type\": \"kafka\", \"extra\": {\"bootstrap.servers\": \"kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093\", \"security.protocol\": \"SSL\", \"ssl.ca.location\": \"/stackable/kafka-tls-pem/ca.crt\", \"group.id\": \"airflow_group\", \"auto.offset.reset\": \"latest\"}}"
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-s3-credentials
key: secretKey
- name: AWS_ACCESS_KEY_ID
value: admin
- name: AIRFLOW_CONN_KAFKA_CONN
value: '{"conn_type": "kafka", "extra": {"bootstrap.servers": "kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093", "security.protocol": "SSL", "ssl.ca.location": "/stackable/tls-pem/ca.crt", "group.id": "airflow_group", "auto.offset.reset": "latest"}}'
- name: AIRFLOW_CONN_S3_CONN
value: '{"conn_type": "aws", "extra": {"endpoint_url": "https://minio.$(NAMESPACE).svc.cluster.local:9000", "verify": "/stackable/tls-pem/ca.crt"}}'
roleGroups:
default:
replicas: 1
kubernetesExecutors:
# do not apply the podOverrides here as we don't need and it will interfere
# with the pod template
# apply the podOverrides to the *base* container
envOverrides: *envOverrides
podOverrides:
spec:
containers:
- name: base
image: oci.stackable.tech/sdp/airflow:3.0.6-stackable0.0.0-dev
imagePullPolicy: IfNotPresent
env:
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-s3-credentials
key: secretKey
- name: AWS_ACCESS_KEY_ID
value: admin
schedulers:
envOverrides: *envOverrides
podOverrides: *podOverrides
Expand All @@ -107,6 +136,77 @@ kind: ConfigMap
metadata:
name: airflow-dags
data:
dbt.py: |
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from kubernetes.client import V1EnvVar, V1EnvVarSource, V1SecretKeySelector

tls_volume = k8s.V1Volume(
name="server-tls-mount",
ephemeral=k8s.V1EphemeralVolumeSource(
volume_claim_template=k8s.V1PersistentVolumeClaimTemplate(
metadata=k8s.V1ObjectMeta(
annotations={
"secrets.stackable.tech/class": "trino-tls",
"secrets.stackable.tech/scope": "pod,node"
}
),
spec=k8s.V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteOnce"],
resources=k8s.V1ResourceRequirements(
requests={"storage": "1"}
),
storage_class_name="secrets.stackable.tech"
)
)
)
)

tls_volume_mount = k8s.V1VolumeMount(
name="server-tls-mount", mount_path="/dbt/trusted"
)

pod_security_context = k8s.V1PodSecurityContext(
fs_group=1000
)

with DAG(
dag_id="run_dbt",
schedule=None,
tags=["Demo", "DBT"],
catchup=False
) as dag:
run_dbt = KubernetesPodOperator(
image="oci.stackable.tech/demos/dbt-demo:0.0.1",
image_pull_policy="IfNotPresent",
cmds=["/bin/bash", "-x", "-euo", "pipefail", "-c"],
arguments=["cd /dbt/dbt_test && export DBT_PROFILES_DIR=/dbt/dbt_test && dbt debug && dbt run && dbt test"],
name="run-dbt",
task_id="dbt-test",
get_logs=True,
volumes=[tls_volume],
volume_mounts=[tls_volume_mount],
env_vars=[
V1EnvVar(
name="TRINO_PASSWORD",
value_from=V1EnvVarSource(
secret_key_ref=V1SecretKeySelector(
name="trino-users",
key="admin"
)
)
),
V1EnvVar(name="TRINO_USER", value="admin"),
V1EnvVar(name="TRINO_HOST", value="trino-coordinator-default-headless.default.svc.cluster.local"),
V1EnvVar(name="TRINO_PORT", value="8443"),
V1EnvVar(name="CERT_PATH", value="/dbt/trusted/ca.crt"),
],
security_context=pod_security_context,
startup_timeout_seconds=600
)
run_dbt

kafka.py: |
from airflow.providers.apache.kafka.triggers.msg_queue import KafkaMessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
Expand Down Expand Up @@ -466,7 +566,6 @@ data:
limit: 1024Mi
replicas: 3


# {% endraw %}
---
apiVersion: v1
Expand Down
28 changes: 28 additions & 0 deletions stacks/airflow/hive-metastores.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
apiVersion: hive.stackable.tech/v1alpha1
kind: HiveCluster
metadata:
name: hive-iceberg
spec:
image:
productVersion: 4.1.0
clusterConfig:
database:
connString: jdbc:postgresql://postgresql-hive-iceberg:5432/hive
dbType: postgres
credentialsSecret: postgres-credentials
s3:
reference: minio
metastore:
roleGroups:
default:
replicas: 1
---
apiVersion: v1
kind: Secret
metadata:
name: postgres-credentials
type: Opaque
stringData:
username: hive
password: hive
Loading