diff --git a/demos/airflow-scheduled-job/create-trino-tables.yaml b/demos/airflow-scheduled-job/create-trino-tables.yaml new file mode 100644 index 00000000..7a841b8b --- /dev/null +++ b/demos/airflow-scheduled-job/create-trino-tables.yaml @@ -0,0 +1,61 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: create-tables-in-trino +spec: + template: + spec: + serviceAccountName: demo-serviceaccount + containers: + - name: create-tables-in-trino + image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + command: ["bash", "-c", "python -u /tmp/script/script.py"] + volumeMounts: + - name: script + mountPath: /tmp/script + - name: trino-users + mountPath: /trino-users + volumes: + - name: script + configMap: + name: create-tables-in-trino-script + - name: trino-users + secret: + secretName: trino-users + restartPolicy: OnFailure + backoffLimit: 50 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: create-tables-in-trino-script +data: + script.py: | + import sys + import trino + + if not sys.warnoptions: + import warnings + warnings.simplefilter("ignore") + + def get_connection(): + connection = trino.dbapi.connect( + host="trino-coordinator", + port=8443, + user="admin", + http_scheme='https', + auth=trino.auth.BasicAuthentication("admin", open("/trino-users/admin").read()), + ) + connection._http_session.verify = False + return connection + + def run_query(connection, query): + print(f"[DEBUG] Executing query {query}") + cursor = connection.cursor() + cursor.execute(query) + return cursor.fetchall() + + connection = get_connection() + + run_query(connection, "CREATE SCHEMA iceberg.dbt_schema WITH (location = 's3a://demo/dbt_schema')") diff --git a/demos/airflow-scheduled-job/dbt/Dockerfile b/demos/airflow-scheduled-job/dbt/Dockerfile index 5057bbbb..ea481e0b 100644 --- a/demos/airflow-scheduled-job/dbt/Dockerfile +++ b/demos/airflow-scheduled-job/dbt/Dockerfile @@ -11,7 +11,7 @@ RUN python -m venv /opt/venv ENV PATH="/opt/venv/bin:$PATH" # Install Python packages -COPY requirements.txt . +COPY demos/airflow-scheduled-job/dbt/requirements.txt . RUN pip install --no-cache-dir --upgrade pip && \ pip install --no-cache-dir -r requirements.txt @@ -30,7 +30,7 @@ ENV PATH="/opt/venv/bin:$PATH" WORKDIR /dbt -COPY dbt_test ./dbt_test +COPY demos/airflow-scheduled-job/dbt/dbt_test ./dbt_test # Security: non-root user RUN useradd -m -u 1000 dbt && chown -R dbt:dbt /dbt diff --git a/demos/airflow-scheduled-job/serviceaccount.yaml b/demos/airflow-scheduled-job/serviceaccount.yaml new file mode 100644 index 00000000..90f2f9b2 --- /dev/null +++ b/demos/airflow-scheduled-job/serviceaccount.yaml @@ -0,0 +1,49 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: demo-serviceaccount + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: demo-clusterrolebinding +subjects: + - kind: ServiceAccount + name: demo-serviceaccount + namespace: default +roleRef: + kind: ClusterRole + name: demo-clusterrole + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: demo-clusterrole +rules: + - apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - watch + - apiGroups: + - apps + resources: + - statefulsets + verbs: + - get + - list + - watch + - apiGroups: + - batch + resources: + - jobs + verbs: + - get + - list + - watch diff --git a/demos/demos-v2.yaml b/demos/demos-v2.yaml index 9b1c6901..b4b7ec4c 100644 --- a/demos/demos-v2.yaml +++ b/demos/demos-v2.yaml @@ -52,6 +52,8 @@ demos: - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/05-enable-and-run-kafka-dag.yaml - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/06-create-opa-users.yaml + - plainYaml: demos/airflow-scheduled-job/serviceaccount.yaml + - plainYaml: demos/airflow-scheduled-job/create-trino-tables.yaml supportedNamespaces: [] resourceRequests: cpu: 2401m diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_18.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_18.png new file mode 100644 index 00000000..90960601 Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/airflow_18.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_2.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_2.png index c86204d7..6fb1f6ef 100644 Binary files a/docs/modules/demos/images/airflow-scheduled-job/airflow_2.png and b/docs/modules/demos/images/airflow-scheduled-job/airflow_2.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_8.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_8.png index 61500d75..6442f1ca 100644 Binary files a/docs/modules/demos/images/airflow-scheduled-job/airflow_8.png and b/docs/modules/demos/images/airflow-scheduled-job/airflow_8.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/overview.png b/docs/modules/demos/images/airflow-scheduled-job/overview.png index 0329bd4b..4d859d59 100644 Binary files a/docs/modules/demos/images/airflow-scheduled-job/overview.png and b/docs/modules/demos/images/airflow-scheduled-job/overview.png differ diff --git a/docs/modules/demos/pages/airflow-scheduled-job.adoc b/docs/modules/demos/pages/airflow-scheduled-job.adoc index a917440f..1f9245e3 100644 --- a/docs/modules/demos/pages/airflow-scheduled-job.adoc +++ b/docs/modules/demos/pages/airflow-scheduled-job.adoc @@ -35,10 +35,13 @@ This demo will * Install the required Stackable operators * Spin up the following data products -** *Postgresql*: An open-source database used for Airflow cluster and job metadata. -** *Airflow*: An open-source workflow management platform for data engineering pipelines. -** *Kafka*: An open-source messaging broker that will be used to trigger an Airflow DAG. -** *Open Policy Agent*: An open-source policy engine used for user authorization. +** *Postgresql*: An open-source database used for Airflow cluster and job metadata +** *Airflow*: An open-source workflow management platform for data engineering pipelines +** *Kafka*: An open-source messaging broker that will be used to trigger an Airflow DAG +** *Open Policy Agent*: An open-source policy engine used for user authorization +** *Trino*: A fast distributed SQL query engine for big data analytics that helps you explore your data universe. This + demo uses it to enable SQL access to the data +** *MinIO*: A S3 compatible object store. This demo uses it as persistent storage to store the Trino data and Airflow logs * Mount several Airflow jobs (referred to as Directed Acyclic Graphs, or DAGs) for the cluster to use * Enable and schedule the jobs * Verify the job status with the Airflow Webserver UI @@ -55,26 +58,34 @@ To list the installed Stackable services run the following command: [source,console] ---- -$ stackablectl stacklet list -n airflow-demo - -┌─────────┬─────────────┬──────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────────────────────────────┐ -│ PRODUCT ┆ NAME ┆ NAMESPACE ┆ ENDPOINTS ┆ CONDITIONS │ -╞═════════╪═════════════╪══════════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪═════════════════════════════════╡ -│ airflow ┆ airflow ┆ airflow-demo ┆ webserver-http http://172.19.0.6:32111 ┆ Available, Reconciling, Running │ -├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ kafka ┆ kafka ┆ airflow-demo ┆ broker-default-0-listener-broker-kafka-tls kafka-broker-default-0-listener-broker.airflow-demo.svc.cluster.local:9093 ┆ Available, Reconciling, Running │ -│ ┆ ┆ ┆ broker-default-0-listener-broker-metrics kafka-broker-default-0-listener-broker.airflow-demo.svc.cluster.local:9606 ┆ │ -│ ┆ ┆ ┆ broker-default-bootstrap-kafka-tls kafka-broker-default-bootstrap.airflow-demo.svc.cluster.local:9093 ┆ │ -├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ opa ┆ opa-airflow ┆ airflow-demo ┆ ┆ Available, Reconciling, Running │ -└─────────┴─────────────┴──────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────────────────┘ +$ stackablectl stacklet list + +┌─────────┬───────────────┬───────────┬───────────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────────────────────────────┐ +│ PRODUCT ┆ NAME ┆ NAMESPACE ┆ ENDPOINTS ┆ CONDITIONS │ +╞═════════╪═══════════════╪═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════╪═════════════════════════════════╡ +│ airflow ┆ airflow ┆ default ┆ webserver-http http://172.19.0.3:31483 ┆ Available, Reconciling, Running │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ hive ┆ hive-iceberg ┆ default ┆ metastore-hive hive-iceberg-metastore.default.svc.cluster.local:9083 ┆ Available, Reconciling, Running │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ kafka ┆ kafka ┆ default ┆ broker-default-0-listener-broker-kafka-tls ┆ Available, Reconciling, Running │ +│ ┆ ┆ ┆ kafka-broker-default-0-listener-broker.default.svc.cluster.local:9093 ┆ │ +│ ┆ ┆ ┆ broker-default-0-listener-broker-metrics ┆ │ +│ ┆ ┆ ┆ kafka-broker-default-0-listener-broker.default.svc.cluster.local:9606 ┆ │ +│ ┆ ┆ ┆ broker-default-bootstrap-kafka-tls kafka-broker-default-bootstrap.default.svc.cluster.local:9093 ┆ │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ opa ┆ opa ┆ default ┆ ┆ Available, Reconciling, Running │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ trino ┆ trino ┆ default ┆ coordinator-https https://172.19.0.5:31087 ┆ Available, Reconciling, Running │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ minio ┆ minio-console ┆ default ┆ https https://172.19.0.4:31792 ┆ │ +└─────────┴───────────────┴───────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────────────────┘ ---- include::partial$instance-hint.adoc[] == Airflow Webserver UI -Open the `airflow` endpoint `webserver-airflow` in your browser (`http://172.19.0.6:32111` in this case). +Open the `airflow` endpoint `webserver-airflow` in your browser (`http://172.19.0.3:31483` in this case). image::airflow-scheduled-job/airflow_1.png[] @@ -112,14 +123,22 @@ In the left-side pane the DAG is displayed either as a graph (this job is so sim image::airflow-scheduled-job/airflow_7.png[] +[NOTE] +==== +The first couple of runs of this DAG _may_ result in the tasks not being ready and being skipped, resulting in an error in the UI. +This is a side-effect of the scheduling mechanism within Airflow: for example, due to the rapid succession of task runs triggered by frequent scheduling. These errors should not appear on later task runs. +==== + Click on the `run_every_minute` box in the centre of the page to select the logs: [WARNING] ==== -In this demo, the logs are not available when the KubernetesExecutor is deployed. +In this demo, the KubernetesExecutor is deployed which means that logs are only preserved (and available in the UI) if either remote logging or the SDP logging framework is configured. +In this demo we set up remote logging using S3/Minio. +Since Minio in this case is set up with TLS, the Airflow connection requires that the webserver has access to a relevant certificate and that every pod has environment variables containing the access and secret keys. See the https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/kubernetes.html#managing-dags-and-logs[Airflow Documentation] for more details. -If you are interested in persisting the logs, take a look at the xref:logging.adoc[] demo. +If you are interested in persisting the logs using the SDP logging framework, take a look at the xref:logging.adoc[] demo. ==== image::airflow-scheduled-job/airflow_8.png[] @@ -215,14 +234,48 @@ image::airflow-scheduled-job/deferrable_04_queued.png[] image::airflow-scheduled-job/deferrable_05_running.png[] image::airflow-scheduled-job/deferrable_06_success.png[] -Now log out and log in again as `richard.roe`. On the home screen no DAGs are visible, as expected by the authorization rules defined for this user: +Now log out and log in again as `richard.roe`. +On the home screen no DAGs are visible, as expected by the authorization rules defined for this user: image::airflow-scheduled-job/opa_01.png[] +=== `run_dbt` DAG + +Log back into the UI as `admin`. +Select the DAGs view, making sure the filter is set to `All`. +Trigger a run on the `run_dbt` DAG (clicking on the trigger button will automatically activate a disabled DAG). +Switch to the task view and click on the `dbt-test` under Task ID. +It takes a few moments before the logs appear: this is because Airflow is configured to use the KubernetesExecutor - which creates new Pods for each DAG task - and as the DAG itself uses the KubernetesPodOperator, this means that _another_ Pod is spawned. +Both of these are terminated on DAG completion. +The pattern is probably overkill for simple scenarios, but we use it in this demo to show that logs from the final task are written to and retrievable from the S3 location defined in the Airflow cluster. + +image::airflow-scheduled-job/airflow_18.png[] + +[NOTE] +==== +If you are tracking the logs in the UI from the task as it is running, you may need to fresh the screen on task completion to see the final logs. +This is because they are written to the running Pod while the task is running, but are then written to S3 upon completion. +==== + +If you switch to the `Code` tab you will see the following: + +[source,python] +---- + run_dbt = KubernetesPodOperator( + image="oci.stackable.tech/demos/dbt-demo:0.0.1", + image_pull_policy="IfNotPresent", + cmds=["/bin/bash", "-x", "-euo", "pipefail", "-c"], + arguments=["cd /dbt/dbt_test && export DBT_PROFILES_DIR=/dbt/dbt_test && dbt debug && dbt run && dbt test"], + ... + ) +---- + +The task checks the configuration, runs a task that inserts some dummy data into a table, and then runs some tests to verify the result. +The details of the simple DBT project can be found https://github.com/stackabletech/demos/tree/main/demos/airflow-scheduled-job/dbt/dbt_test[in the demos repository]. == Patching Airflow to stress-test DAG parsing using relevant environment variables -Log back into the UI as `admin`. +Make sure you are still logged in as `admin`. The demo also created a third DAG in the ConfigMap, called `dag_factory.py`, which was not mounted to the cluster and therefore does not appear in the UI. This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG _parsing_). To show these individual DAGs in the overall list (and to remove the existing ones), adjust the volumeMounts as shown below. diff --git a/stacks/airflow/airflow.yaml b/stacks/airflow/airflow.yaml index 7e4ca791..46f1546e 100644 --- a/stacks/airflow/airflow.yaml +++ b/stacks/airflow/airflow.yaml @@ -11,7 +11,7 @@ spec: clusterConfig: authorization: opa: - configMapName: opa-airflow + configMapName: opa package: airflow cache: entryTimeToLive: 5s @@ -22,7 +22,7 @@ spec: - name: airflow-dags configMap: name: airflow-dags - - name: kafka-tls-pem + - name: tls-pem configMap: name: truststore-pem volumeMounts: @@ -41,8 +41,11 @@ spec: - name: airflow-dags mountPath: /dags/triggerer.py subPath: triggerer.py - - name: kafka-tls-pem - mountPath: /stackable/kafka-tls-pem + - name: airflow-dags + mountPath: /dags/dbt.py + subPath: dbt.py + - name: tls-pem + mountPath: /stackable/tls-pem webservers: roleConfig: listenerClass: external-stable @@ -59,6 +62,10 @@ spec: AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" # Airflow 3: Disable decision caching for easy debugging AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE: "0" + AIRFLOW__LOGGING__REMOTE_LOGGING: "True" + AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "s3_conn" + AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "s3://airflow/logs" + AIRFLOW__LOGGING__ENCRYPT_S3_LOGS: "FALSE" configOverrides: webserver_config.py: # Allow "POST /login/" without CSRF token @@ -70,19 +77,41 @@ spec: image: oci.stackable.tech/sdp/airflow:3.0.6-stackable0.0.0-dev imagePullPolicy: IfNotPresent env: - - name: NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - - name: AIRFLOW_CONN_KAFKA_CONN - value: "{\"conn_type\": \"kafka\", \"extra\": {\"bootstrap.servers\": \"kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093\", \"security.protocol\": \"SSL\", \"ssl.ca.location\": \"/stackable/kafka-tls-pem/ca.crt\", \"group.id\": \"airflow_group\", \"auto.offset.reset\": \"latest\"}}" + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-s3-credentials + key: secretKey + - name: AWS_ACCESS_KEY_ID + value: admin + - name: AIRFLOW_CONN_KAFKA_CONN + value: '{"conn_type": "kafka", "extra": {"bootstrap.servers": "kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093", "security.protocol": "SSL", "ssl.ca.location": "/stackable/tls-pem/ca.crt", "group.id": "airflow_group", "auto.offset.reset": "latest"}}' + - name: AIRFLOW_CONN_S3_CONN + value: '{"conn_type": "aws", "extra": {"endpoint_url": "https://minio.$(NAMESPACE).svc.cluster.local:9000", "verify": "/stackable/tls-pem/ca.crt"}}' roleGroups: default: replicas: 1 kubernetesExecutors: - # do not apply the podOverrides here as we don't need and it will interfere - # with the pod template + # apply the podOverrides to the *base* container envOverrides: *envOverrides + podOverrides: + spec: + containers: + - name: base + image: oci.stackable.tech/sdp/airflow:3.0.6-stackable0.0.0-dev + imagePullPolicy: IfNotPresent + env: + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-s3-credentials + key: secretKey + - name: AWS_ACCESS_KEY_ID + value: admin schedulers: envOverrides: *envOverrides podOverrides: *podOverrides @@ -107,6 +136,77 @@ kind: ConfigMap metadata: name: airflow-dags data: + dbt.py: | + from airflow import DAG + from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator + from kubernetes.client import models as k8s + from kubernetes.client import V1EnvVar, V1EnvVarSource, V1SecretKeySelector + + tls_volume = k8s.V1Volume( + name="server-tls-mount", + ephemeral=k8s.V1EphemeralVolumeSource( + volume_claim_template=k8s.V1PersistentVolumeClaimTemplate( + metadata=k8s.V1ObjectMeta( + annotations={ + "secrets.stackable.tech/class": "trino-tls", + "secrets.stackable.tech/scope": "pod,node" + } + ), + spec=k8s.V1PersistentVolumeClaimSpec( + access_modes=["ReadWriteOnce"], + resources=k8s.V1ResourceRequirements( + requests={"storage": "1"} + ), + storage_class_name="secrets.stackable.tech" + ) + ) + ) + ) + + tls_volume_mount = k8s.V1VolumeMount( + name="server-tls-mount", mount_path="/dbt/trusted" + ) + + pod_security_context = k8s.V1PodSecurityContext( + fs_group=1000 + ) + + with DAG( + dag_id="run_dbt", + schedule=None, + tags=["Demo", "DBT"], + catchup=False + ) as dag: + run_dbt = KubernetesPodOperator( + image="oci.stackable.tech/demos/dbt-demo:0.0.1", + image_pull_policy="IfNotPresent", + cmds=["/bin/bash", "-x", "-euo", "pipefail", "-c"], + arguments=["cd /dbt/dbt_test && export DBT_PROFILES_DIR=/dbt/dbt_test && dbt debug && dbt run && dbt test"], + name="run-dbt", + task_id="dbt-test", + get_logs=True, + volumes=[tls_volume], + volume_mounts=[tls_volume_mount], + env_vars=[ + V1EnvVar( + name="TRINO_PASSWORD", + value_from=V1EnvVarSource( + secret_key_ref=V1SecretKeySelector( + name="trino-users", + key="admin" + ) + ) + ), + V1EnvVar(name="TRINO_USER", value="admin"), + V1EnvVar(name="TRINO_HOST", value="trino-coordinator-default-headless.default.svc.cluster.local"), + V1EnvVar(name="TRINO_PORT", value="8443"), + V1EnvVar(name="CERT_PATH", value="/dbt/trusted/ca.crt"), + ], + security_context=pod_security_context, + startup_timeout_seconds=600 + ) + run_dbt + kafka.py: | from airflow.providers.apache.kafka.triggers.msg_queue import KafkaMessageQueueTrigger from airflow.providers.standard.operators.empty import EmptyOperator @@ -466,7 +566,6 @@ data: limit: 1024Mi replicas: 3 - # {% endraw %} --- apiVersion: v1 diff --git a/stacks/airflow/hive-metastores.yaml b/stacks/airflow/hive-metastores.yaml new file mode 100644 index 00000000..a908c0c1 --- /dev/null +++ b/stacks/airflow/hive-metastores.yaml @@ -0,0 +1,28 @@ +--- +apiVersion: hive.stackable.tech/v1alpha1 +kind: HiveCluster +metadata: + name: hive-iceberg +spec: + image: + productVersion: 4.1.0 + clusterConfig: + database: + connString: jdbc:postgresql://postgresql-hive-iceberg:5432/hive + dbType: postgres + credentialsSecret: postgres-credentials + s3: + reference: minio + metastore: + roleGroups: + default: + replicas: 1 +--- +apiVersion: v1 +kind: Secret +metadata: + name: postgres-credentials +type: Opaque +stringData: + username: hive + password: hive diff --git a/stacks/airflow/minio.yaml b/stacks/airflow/minio.yaml new file mode 100644 index 00000000..34089aac --- /dev/null +++ b/stacks/airflow/minio.yaml @@ -0,0 +1,715 @@ +--- +# Source: minio/templates/serviceaccount.yaml +apiVersion: v1 +kind: ServiceAccount +metadata: + name: "minio-sa" +--- +# Source: minio/templates/secrets.yaml +apiVersion: v1 +kind: Secret +metadata: + name: minio + labels: + app: minio + chart: minio-5.4.0 + release: minio + heritage: Helm +type: Opaque +data: + rootUser: "YWRtaW4=" + rootPassword: "YWRtaW5hZG1pbg==" +--- +# Source: minio/templates/configmap.yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: minio + labels: + app: minio + chart: minio-5.4.0 + release: minio + heritage: Helm +data: + initialize: |- + #!/bin/sh + set -e # Have script exit in the event of a failed command. + MC_CONFIG_DIR="/etc/minio/mc/" + MC="/usr/bin/mc --insecure --config-dir ${MC_CONFIG_DIR}" + + # connectToMinio + # Use a check-sleep-check loop to wait for MinIO service to be available + connectToMinio() { + SCHEME=$1 + ATTEMPTS=0 + LIMIT=29 # Allow 30 attempts + set -e # fail if we can't read the keys. + ACCESS=$(cat /config/rootUser) + SECRET=$(cat /config/rootPassword) + set +e # The connections to minio are allowed to fail. + echo "Connecting to MinIO server: $SCHEME://$MINIO_ENDPOINT:$MINIO_PORT" + MC_COMMAND="${MC} alias set myminio $SCHEME://$MINIO_ENDPOINT:$MINIO_PORT $ACCESS $SECRET" + $MC_COMMAND + STATUS=$? + until [ $STATUS = 0 ]; do + ATTEMPTS=$(expr $ATTEMPTS + 1) + echo \"Failed attempts: $ATTEMPTS\" + if [ $ATTEMPTS -gt $LIMIT ]; then + exit 1 + fi + sleep 2 # 1 second intervals between attempts + $MC_COMMAND + STATUS=$? + done + set -e # reset `e` as active + return 0 + } + + # checkBucketExists ($bucket) + # Check if the bucket exists, by using the exit code of `mc ls` + checkBucketExists() { + BUCKET=$1 + CMD=$(${MC} stat myminio/$BUCKET >/dev/null 2>&1) + return $? + } + + # createBucket ($bucket, $policy, $purge) + # Ensure bucket exists, purging if asked to + createBucket() { + BUCKET=$1 + POLICY=$2 + PURGE=$3 + VERSIONING=$4 + OBJECTLOCKING=$5 + + # Purge the bucket, if set & exists + # Since PURGE is user input, check explicitly for `true` + if [ $PURGE = true ]; then + if checkBucketExists $BUCKET; then + echo "Purging bucket '$BUCKET'." + set +e # don't exit if this fails + ${MC} rm -r --force myminio/$BUCKET + set -e # reset `e` as active + else + echo "Bucket '$BUCKET' does not exist, skipping purge." + fi + fi + + # Create the bucket if it does not exist and set objectlocking if enabled (NOTE: versioning will be not changed if OBJECTLOCKING is set because it enables versioning to the Buckets created) + if ! checkBucketExists $BUCKET; then + if [ ! -z $OBJECTLOCKING ]; then + if [ $OBJECTLOCKING = true ]; then + echo "Creating bucket with OBJECTLOCKING '$BUCKET'" + ${MC} mb --with-lock myminio/$BUCKET + elif [ $OBJECTLOCKING = false ]; then + echo "Creating bucket '$BUCKET'" + ${MC} mb myminio/$BUCKET + fi + elif [ -z $OBJECTLOCKING ]; then + echo "Creating bucket '$BUCKET'" + ${MC} mb myminio/$BUCKET + else + echo "Bucket '$BUCKET' already exists." + fi + fi + + # set versioning for bucket if objectlocking is disabled or not set + if [ $OBJECTLOCKING = false ]; then + if [ ! -z $VERSIONING ]; then + if [ $VERSIONING = true ]; then + echo "Enabling versioning for '$BUCKET'" + ${MC} version enable myminio/$BUCKET + elif [ $VERSIONING = false ]; then + echo "Suspending versioning for '$BUCKET'" + ${MC} version suspend myminio/$BUCKET + fi + fi + else + echo "Bucket '$BUCKET' versioning unchanged." + fi + + # At this point, the bucket should exist, skip checking for existence + # Set policy on the bucket + echo "Setting policy of bucket '$BUCKET' to '$POLICY'." + ${MC} anonymous set $POLICY myminio/$BUCKET + } + + # Try connecting to MinIO instance + scheme=https + connectToMinio $scheme + + + # Create the buckets + createBucket demo "public" false false false + createBucket airflow "public" false false false + + add-user: |- + #!/bin/sh + set -e ; # Have script exit in the event of a failed command. + MC_CONFIG_DIR="/etc/minio/mc/" + MC="/usr/bin/mc --insecure --config-dir ${MC_CONFIG_DIR}" + + # AccessKey and secretkey credentials file are added to prevent shell execution errors caused by special characters. + # Special characters for example : ',",<,>,{,} + MINIO_ACCESSKEY_SECRETKEY_TMP="/tmp/accessKey_and_secretKey_tmp" + + # connectToMinio + # Use a check-sleep-check loop to wait for MinIO service to be available + connectToMinio() { + SCHEME=$1 + ATTEMPTS=0 ; LIMIT=29 ; # Allow 30 attempts + set -e ; # fail if we can't read the keys. + ACCESS=$(cat /config/rootUser) ; SECRET=$(cat /config/rootPassword) ; + set +e ; # The connections to minio are allowed to fail. + echo "Connecting to MinIO server: $SCHEME://$MINIO_ENDPOINT:$MINIO_PORT" ; + MC_COMMAND="${MC} alias set myminio $SCHEME://$MINIO_ENDPOINT:$MINIO_PORT $ACCESS $SECRET" ; + $MC_COMMAND ; + STATUS=$? ; + until [ $STATUS = 0 ] + do + ATTEMPTS=`expr $ATTEMPTS + 1` ; + echo \"Failed attempts: $ATTEMPTS\" ; + if [ $ATTEMPTS -gt $LIMIT ]; then + exit 1 ; + fi ; + sleep 2 ; # 1 second intervals between attempts + $MC_COMMAND ; + STATUS=$? ; + done ; + set -e ; # reset `e` as active + return 0 + } + + # checkUserExists () + # Check if the user exists, by using the exit code of `mc admin user info` + checkUserExists() { + CMD=$(${MC} admin user info myminio $(head -1 $MINIO_ACCESSKEY_SECRETKEY_TMP) > /dev/null 2>&1) + return $? + } + + # createUser ($policy) + createUser() { + POLICY=$1 + #check accessKey_and_secretKey_tmp file + if [[ ! -f $MINIO_ACCESSKEY_SECRETKEY_TMP ]];then + echo "credentials file does not exist" + return 1 + fi + if [[ $(cat $MINIO_ACCESSKEY_SECRETKEY_TMP|wc -l) -ne 2 ]];then + echo "credentials file is invalid" + rm -f $MINIO_ACCESSKEY_SECRETKEY_TMP + return 1 + fi + USER=$(head -1 $MINIO_ACCESSKEY_SECRETKEY_TMP) + # Create the user if it does not exist + if ! checkUserExists ; then + echo "Creating user '$USER'" + cat $MINIO_ACCESSKEY_SECRETKEY_TMP | ${MC} admin user add myminio + else + echo "User '$USER' already exists." + fi + #clean up credentials files. + rm -f $MINIO_ACCESSKEY_SECRETKEY_TMP + + # set policy for user + if [ ! -z $POLICY -a $POLICY != " " ] ; then + echo "Adding policy '$POLICY' for '$USER'" + set +e ; # policy already attach errors out, allow it. + ${MC} admin policy attach myminio $POLICY --user=$USER + set -e + else + echo "User '$USER' has no policy attached." + fi + } + + # Try connecting to MinIO instance + scheme=https + connectToMinio $scheme + + + # Create the users + echo console > $MINIO_ACCESSKEY_SECRETKEY_TMP + echo console123 >> $MINIO_ACCESSKEY_SECRETKEY_TMP + createUser consoleAdmin + + add-policy: |- + #!/bin/sh + set -e ; # Have script exit in the event of a failed command. + MC_CONFIG_DIR="/etc/minio/mc/" + MC="/usr/bin/mc --insecure --config-dir ${MC_CONFIG_DIR}" + + # connectToMinio + # Use a check-sleep-check loop to wait for MinIO service to be available + connectToMinio() { + SCHEME=$1 + ATTEMPTS=0 ; LIMIT=29 ; # Allow 30 attempts + set -e ; # fail if we can't read the keys. + ACCESS=$(cat /config/rootUser) ; SECRET=$(cat /config/rootPassword) ; + set +e ; # The connections to minio are allowed to fail. + echo "Connecting to MinIO server: $SCHEME://$MINIO_ENDPOINT:$MINIO_PORT" ; + MC_COMMAND="${MC} alias set myminio $SCHEME://$MINIO_ENDPOINT:$MINIO_PORT $ACCESS $SECRET" ; + $MC_COMMAND ; + STATUS=$? ; + until [ $STATUS = 0 ] + do + ATTEMPTS=`expr $ATTEMPTS + 1` ; + echo \"Failed attempts: $ATTEMPTS\" ; + if [ $ATTEMPTS -gt $LIMIT ]; then + exit 1 ; + fi ; + sleep 2 ; # 1 second intervals between attempts + $MC_COMMAND ; + STATUS=$? ; + done ; + set -e ; # reset `e` as active + return 0 + } + + # checkPolicyExists ($policy) + # Check if the policy exists, by using the exit code of `mc admin policy info` + checkPolicyExists() { + POLICY=$1 + CMD=$(${MC} admin policy info myminio $POLICY > /dev/null 2>&1) + return $? + } + + # createPolicy($name, $filename) + createPolicy () { + NAME=$1 + FILENAME=$2 + + # Create the name if it does not exist + echo "Checking policy: $NAME (in /config/$FILENAME.json)" + if ! checkPolicyExists $NAME ; then + echo "Creating policy '$NAME'" + else + echo "Policy '$NAME' already exists." + fi + ${MC} admin policy create myminio $NAME /config/$FILENAME.json + + } + + # Try connecting to MinIO instance + scheme=https + connectToMinio $scheme + + + add-svcacct: |- + #!/bin/sh + set -e ; # Have script exit in the event of a failed command. + MC_CONFIG_DIR="/etc/minio/mc/" + MC="/usr/bin/mc --insecure --config-dir ${MC_CONFIG_DIR}" + + # AccessKey and secretkey credentials file are added to prevent shell execution errors caused by special characters. + # Special characters for example : ',",<,>,{,} + MINIO_ACCESSKEY_SECRETKEY_TMP="/tmp/accessKey_and_secretKey_svcacct_tmp" + + # connectToMinio + # Use a check-sleep-check loop to wait for MinIO service to be available + connectToMinio() { + SCHEME=$1 + ATTEMPTS=0 ; LIMIT=29 ; # Allow 30 attempts + set -e ; # fail if we can't read the keys. + ACCESS=$(cat /config/rootUser) ; SECRET=$(cat /config/rootPassword) ; + set +e ; # The connections to minio are allowed to fail. + echo "Connecting to MinIO server: $SCHEME://$MINIO_ENDPOINT:$MINIO_PORT" ; + MC_COMMAND="${MC} alias set myminio $SCHEME://$MINIO_ENDPOINT:$MINIO_PORT $ACCESS $SECRET" ; + $MC_COMMAND ; + STATUS=$? ; + until [ $STATUS = 0 ] + do + ATTEMPTS=`expr $ATTEMPTS + 1` ; + echo \"Failed attempts: $ATTEMPTS\" ; + if [ $ATTEMPTS -gt $LIMIT ]; then + exit 1 ; + fi ; + sleep 2 ; # 2 second intervals between attempts + $MC_COMMAND ; + STATUS=$? ; + done ; + set -e ; # reset `e` as active + return 0 + } + + # checkSvcacctExists () + # Check if the svcacct exists, by using the exit code of `mc admin user svcacct info` + checkSvcacctExists() { + CMD=$(${MC} admin user svcacct info myminio $(head -1 $MINIO_ACCESSKEY_SECRETKEY_TMP) > /dev/null 2>&1) + return $? + } + + # createSvcacct ($user) + createSvcacct () { + USER=$1 + FILENAME=$2 + #check accessKey_and_secretKey_tmp file + if [[ ! -f $MINIO_ACCESSKEY_SECRETKEY_TMP ]];then + echo "credentials file does not exist" + return 1 + fi + if [[ $(cat $MINIO_ACCESSKEY_SECRETKEY_TMP|wc -l) -ne 2 ]];then + echo "credentials file is invalid" + rm -f $MINIO_ACCESSKEY_SECRETKEY_TMP + return 1 + fi + SVCACCT=$(head -1 $MINIO_ACCESSKEY_SECRETKEY_TMP) + # Create the svcacct if it does not exist + if ! checkSvcacctExists ; then + echo "Creating svcacct '$SVCACCT'" + # Check if policy file is define + if [ -z $FILENAME ]; then + ${MC} admin user svcacct add --access-key $(head -1 $MINIO_ACCESSKEY_SECRETKEY_TMP) --secret-key $(tail -n1 $MINIO_ACCESSKEY_SECRETKEY_TMP) myminio $USER + else + ${MC} admin user svcacct add --access-key $(head -1 $MINIO_ACCESSKEY_SECRETKEY_TMP) --secret-key $(tail -n1 $MINIO_ACCESSKEY_SECRETKEY_TMP) --policy /config/$FILENAME.json myminio $USER + fi + else + echo "Svcacct '$SVCACCT' already exists." + fi + #clean up credentials files. + rm -f $MINIO_ACCESSKEY_SECRETKEY_TMP + } + + # Try connecting to MinIO instance + scheme=https + connectToMinio $scheme + + + custom-command: |- + #!/bin/sh + set -e ; # Have script exit in the event of a failed command. + MC_CONFIG_DIR="/etc/minio/mc/" + MC="/usr/bin/mc --insecure --config-dir ${MC_CONFIG_DIR}" + + # connectToMinio + # Use a check-sleep-check loop to wait for MinIO service to be available + connectToMinio() { + SCHEME=$1 + ATTEMPTS=0 ; LIMIT=29 ; # Allow 30 attempts + set -e ; # fail if we can't read the keys. + ACCESS=$(cat /config/rootUser) ; SECRET=$(cat /config/rootPassword) ; + set +e ; # The connections to minio are allowed to fail. + echo "Connecting to MinIO server: $SCHEME://$MINIO_ENDPOINT:$MINIO_PORT" ; + MC_COMMAND="${MC} alias set myminio $SCHEME://$MINIO_ENDPOINT:$MINIO_PORT $ACCESS $SECRET" ; + $MC_COMMAND ; + STATUS=$? ; + until [ $STATUS = 0 ] + do + ATTEMPTS=`expr $ATTEMPTS + 1` ; + echo \"Failed attempts: $ATTEMPTS\" ; + if [ $ATTEMPTS -gt $LIMIT ]; then + exit 1 ; + fi ; + sleep 2 ; # 1 second intervals between attempts + $MC_COMMAND ; + STATUS=$? ; + done ; + set -e ; # reset `e` as active + return 0 + } + + # runCommand ($@) + # Run custom mc command + runCommand() { + ${MC} "$@" + return $? + } + + # Try connecting to MinIO instance + scheme=https + connectToMinio $scheme +--- +# Source: minio/templates/pvc.yaml +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: minio + labels: + app: minio + chart: minio-5.4.0 + release: minio + heritage: Helm +spec: + accessModes: + - "ReadWriteOnce" + resources: + requests: + storage: "10Gi" +--- +# Source: minio/templates/console-service.yaml +apiVersion: v1 +kind: Service +metadata: + name: minio-console + labels: + app: minio + chart: minio-5.4.0 + release: minio + heritage: Helm +spec: + type: NodePort + externalTrafficPolicy: "Cluster" + ports: + - name: https + port: 9001 + protocol: TCP + targetPort: 9001 + selector: + app: minio + release: minio +--- +# Source: minio/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: minio + labels: + app: minio + chart: minio-5.4.0 + release: minio + heritage: Helm + monitoring: "true" +spec: + type: NodePort + externalTrafficPolicy: "Cluster" + ports: + - name: https + port: 9000 + protocol: TCP + targetPort: 9000 + selector: + app: minio + release: minio +--- +# Source: minio/templates/deployment.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: minio + labels: + app: minio + chart: minio-5.4.0 + release: minio + heritage: Helm + stackable.tech/vendor: Stackable +spec: + strategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 100% + maxUnavailable: 0 + replicas: 1 + selector: + matchLabels: + app: minio + release: minio + template: + metadata: + name: minio + labels: + app: minio + release: minio + stackable.tech/vendor: Stackable + annotations: + checksum/secrets: fa63e34a92c817c84057e2d452fa683e66462a57b0529388fb96a57e05f38e57 + checksum/config: ebea49cc4c1bfbd1b156a58bf770a776ff87fe199f642d31c2816b5515112e72 + spec: + securityContext: + + fsGroup: 1000 + fsGroupChangePolicy: OnRootMismatch + runAsGroup: 1000 + runAsUser: 1000 + + serviceAccountName: minio-sa + containers: + - name: minio + image: "quay.io/minio/minio:RELEASE.2024-12-18T13-15-44Z" + imagePullPolicy: IfNotPresent + command: + - "/bin/sh" + - "-ce" + - | + # minio requires the TLS key pair to be specially named + # mkdir -p /etc/minio/certs + cp -v /etc/minio/original_certs/tls.crt /etc/minio/certs/public.crt + cp -v /etc/minio/original_certs/tls.key /etc/minio/certs/private.key + + /usr/bin/docker-entrypoint.sh minio server /export -S /etc/minio/certs/ --address :9000 --console-address :9001 + volumeMounts: + - name: minio-user + mountPath: "/tmp/credentials" + readOnly: true + - name: export + mountPath: /export + - mountPath: /etc/minio/original_certs + name: tls + - mountPath: /etc/minio/certs + name: certs + ports: + - name: https + containerPort: 9000 + - name: https-console + containerPort: 9001 + env: + - name: MINIO_ROOT_USER + valueFrom: + secretKeyRef: + name: minio + key: rootUser + - name: MINIO_ROOT_PASSWORD + valueFrom: + secretKeyRef: + name: minio + key: rootPassword + - name: MINIO_PROMETHEUS_AUTH_TYPE + value: "public" + resources: + requests: + cpu: 1 + memory: 2Gi + securityContext: + readOnlyRootFilesystem: false + volumes: + - name: export + persistentVolumeClaim: + claimName: minio + - name: minio-user + secret: + secretName: minio + + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: tls + secrets.stackable.tech/scope: service=minio + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1 + storageClassName: secrets.stackable.tech + name: tls + - emptyDir: + medium: Memory + sizeLimit: 5Mi + name: certs +--- +# Source: minio/templates/post-job.yaml +apiVersion: batch/v1 +kind: Job +metadata: + name: minio-post-job + labels: + app: minio-post-job + chart: minio-5.4.0 + release: minio + heritage: Helm + annotations: + "helm.sh/hook": post-install,post-upgrade + "helm.sh/hook-delete-policy": hook-succeeded,before-hook-creation +spec: + backoffLimit: 50 + template: + metadata: + labels: + app: minio-job + release: minio + stackable.tech/vendor: Stackable + spec: + restartPolicy: OnFailure + volumes: + - name: etc-path + emptyDir: {} + - name: tmp + emptyDir: {} + - name: minio-configuration + projected: + sources: + - configMap: + name: minio + - secret: + name: minio + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: tls + secrets.stackable.tech/scope: service=minio + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1 + storageClassName: secrets.stackable.tech + name: tls + - emptyDir: + medium: Memory + sizeLimit: 5Mi + name: certs + serviceAccountName: minio-sa + containers: + - name: minio-make-bucket + image: "quay.io/minio/mc:RELEASE.2024-11-21T17-21-54Z" + imagePullPolicy: IfNotPresent + command: + - "/bin/sh" + - "-ce" + - | + # Copy the CA cert from the "tls" SecretClass + # mkdir -p /etc/minio/mc/certs/CAs + cp -v /etc/minio/mc/original_certs/ca.crt /etc/minio/mc/certs/CAs/public.crt + + . /config/initialize + env: + - name: MINIO_ENDPOINT + value: minio + - name: MINIO_PORT + value: "9000" + volumeMounts: + - name: etc-path + mountPath: /etc/minio/mc + - name: tmp + mountPath: /tmp + - name: minio-configuration + mountPath: /config + - name: tls + mountPath: /etc/minio/mc/original_certs + - name: certs + mountPath: /etc/minio/mc/certs/CAs + resources: + requests: + memory: 128Mi + - name: minio-make-user + image: "quay.io/minio/mc:RELEASE.2024-11-21T17-21-54Z" + imagePullPolicy: IfNotPresent + command: + - "/bin/sh" + - "-ce" + - | + # Copy the CA cert from the "tls" SecretClass + # mkdir -p /etc/minio/mc/certs/CAs + cp -v /etc/minio/mc/original_certs/ca.crt /etc/minio/mc/certs/CAs/public.crt + + . /config/add-user + env: + - name: MINIO_ENDPOINT + value: minio + - name: MINIO_PORT + value: "9000" + volumeMounts: + - name: etc-path + mountPath: /etc/minio/mc + - name: tmp + mountPath: /tmp + - name: minio-configuration + mountPath: /config + - name: tls + mountPath: /etc/minio/mc/original_certs + - name: certs + mountPath: /etc/minio/mc/certs/CAs + resources: + requests: + memory: 128Mi diff --git a/stacks/airflow/opa.yaml b/stacks/airflow/opa.yaml index 783cdab4..79f089bc 100644 --- a/stacks/airflow/opa.yaml +++ b/stacks/airflow/opa.yaml @@ -2,7 +2,7 @@ apiVersion: opa.stackable.tech/v1alpha1 kind: OpaCluster metadata: - name: opa-airflow + name: opa spec: image: productVersion: 1.8.0 diff --git a/stacks/airflow/rbac.yaml b/stacks/airflow/rbac.yaml new file mode 100644 index 00000000..8c27213e --- /dev/null +++ b/stacks/airflow/rbac.yaml @@ -0,0 +1,24 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + namespace: default + name: airflow-events-reader +rules: +- apiGroups: [""] + resources: ["events"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: airflow-events-binding + namespace: default +subjects: +- kind: ServiceAccount + name: airflow-serviceaccount + namespace: default +roleRef: + kind: Role + name: airflow-events-reader + apiGroup: rbac.authorization.k8s.io diff --git a/stacks/airflow/s3-connection.yaml b/stacks/airflow/s3-connection.yaml new file mode 100644 index 00000000..411d7a4d --- /dev/null +++ b/stacks/airflow/s3-connection.yaml @@ -0,0 +1,36 @@ +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: minio +spec: + host: minio.default.svc.cluster.local + port: 9000 + accessStyle: Path + credentials: + secretClass: minio-s3-credentials + tls: + verification: + server: + caCert: + secretClass: tls +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: minio-s3-credentials +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-s3-credentials + labels: + secrets.stackable.tech/class: minio-s3-credentials +stringData: + accessKey: admin + secretKey: {{ minioAdminPassword }} diff --git a/stacks/airflow/trino.yaml b/stacks/airflow/trino.yaml new file mode 100644 index 00000000..4bb023c1 --- /dev/null +++ b/stacks/airflow/trino.yaml @@ -0,0 +1,124 @@ +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCluster +metadata: + name: trino +spec: + image: + productVersion: "477" + clusterConfig: + catalogLabelSelector: + matchLabels: + trino: trino + authentication: + - authenticationClass: trino-users + authorization: + opa: + configMapName: opa + package: trino + tls: + serverSecretClass: trino-tls + internalSecretClass: trino-internal-tls + coordinators: + roleGroups: + default: + replicas: 1 + roleConfig: + listenerClass: external-stable + workers: + roleGroups: + default: + replicas: 1 +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: trino-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-trino-tls-ca + namespace: default + autoGenerate: true +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: trino-internal-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-trino-internal-tls-ca + namespace: default + autoGenerate: true +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: trino-users +spec: + provider: + static: + userCredentialsSecret: + name: trino-users +--- +apiVersion: v1 +kind: Secret +metadata: + name: trino-users +type: kubernetes.io/opaque +stringData: + admin: "{{ trinoAdminPassword }}" +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCatalog +metadata: + name: iceberg + labels: + trino: trino +spec: + connector: + iceberg: + metastore: + configMap: hive-iceberg + s3: + reference: minio +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: trino-opa-bundle + labels: + opa.stackable.tech/bundle: "trino" +data: + trino.rego: | + package trino + + default allow = false + + # Allow non-batched access + allow if { + is_admin + } + # Allow batched access + batch contains i if { + some i + input.action.filterResources[i] + is_admin + } + # Corner case: filtering columns is done with a single table item, and many columns inside + batch contains i if { + some i + input.action.operation == "FilterColumns" + count(input.action.filterResources) == 1 + input.action.filterResources[0].table.columns[i] + is_admin + } + + is_admin() if { + input.context.identity.user == "admin" + } diff --git a/stacks/stacks-v2.yaml b/stacks/stacks-v2.yaml index b1d68edb..e20b4790 100644 --- a/stacks/stacks-v2.yaml +++ b/stacks/stacks-v2.yaml @@ -142,20 +142,39 @@ stacks: - spark-k8s - kafka - opa + - hive + - trino labels: + - trino + - opa + - iceberg + - minio + - s3 - airflow manifests: - helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/postgresql-airflow.yaml - - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/airflow.yaml + - helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/postgresql-hive-iceberg.yaml - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/kafka.yaml - - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/opa.yaml - - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/opa-rules.yaml - supportedNamespaces: [] + - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/trino-iceberg/s3-connection.yaml + - plainYaml: stacks/airflow/hive-metastores.yaml + - plainYaml: stacks/airflow/trino.yaml + - plainYaml: stacks/airflow/airflow.yaml + - plainYaml: stacks/airflow/rbac.yaml + - plainYaml: stacks/airflow/opa-rules.yaml + - plainYaml: stacks/airflow/opa.yaml + - plainYaml: stacks/airflow/minio.yaml + supportedNamespaces: ["default"] # auto-generated secret classes need to be templated resourceRequests: cpu: 3400m memory: 9010Mi pvc: 24Gi parameters: + - name: trinoAdminPassword + description: Password of the Trino admin user + default: adminadmin + - name: minioAdminPassword + description: Password of the MinIO admin user + default: adminadmin - name: airflowAdminPassword description: Password of the Airflow admin user default: adminadmin @@ -408,6 +427,48 @@ stacks: - name: minioAdminPassword description: Password of the MinIO admin user default: adminadmin + airflow-trino-dbt: + description: Stack containing Airflow and Trino using Apache Iceberg as a S3 backend for dbt jobs + stackableRelease: dev + stackableOperators: + - commons + - listener + - secret + - hive + - trino + - opa + - airflow + labels: + - trino + - opa + - iceberg + - minio + - s3 + manifests: + - helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/postgresql-hive-iceberg.yaml + - helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/postgresql-airflow.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/trino-iceberg/s3-connection.yaml + - plainYaml: stacks/airflow-trino-dbt/hive-metastores.yaml + - plainYaml: stacks/airflow-trino-dbt/trino.yaml + - plainYaml: stacks/airflow-trino-dbt/airflow.yaml + - plainYaml: stacks/airflow-trino-dbt/rbac.yaml + - plainYaml: stacks/airflow-trino-dbt/opa-rules.yaml + - plainYaml: stacks/airflow-trino-dbt/opa.yaml + - plainYaml: stacks/airflow-trino-dbt/minio.yaml + supportedNamespaces: ["default"] + parameters: + - name: trinoAdminPassword + description: Password of the Trino admin user + default: adminadmin + - name: minioAdminPassword + description: Password of the MinIO admin user + default: adminadmin + - name: airflowAdminPassword + description: Password of the MinIO admin user + default: adminadmin + - name: airflowSecretKey + description: Airflow's secret key used to generate e.g. user session tokens + default: airflowSecretKey jupyterhub-pyspark-hdfs: description: Jupyterhub with PySpark and HDFS integration stackableRelease: dev