diff --git a/knowledge_base/pydabs_job_conditional_execution/README.md b/knowledge_base/pydabs_job_conditional_execution/README.md new file mode 100644 index 0000000..c10e4a4 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/README.md @@ -0,0 +1,64 @@ +# pydabs_job_conditional_execution + +This example demonstrates a Lakeflow Job that uses conditional task execution based on data quality checks. + +The Lakeflow Job consists of following tasks: +1. Checks data quality and calculates bad records +2. Evaluates if bad records exceed a threshold (100 records) +3. Routes to different processing paths based on the condition: + - If bad records > 100: runs `handle_bad_data` task + - If bad records ≤ 100: runs `continue_pipeline` task + +* `src/`: Notebook source code for this project. + * `src/check_quality.ipynb`: Checks data quality and outputs bad record count + * `src/process_bad_data.ipynb`: Handles cases with high bad record count + * `src/process_good_data.ipynb`: Continues normal pipeline for good data +* `resources/`: Resource configurations (jobs, pipelines, etc.) + * `resources/conditional_execution.py`: PyDABs job definition with conditional tasks + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, this project will deploy a job called + `[dev yourname] pydabs_job_conditional_execution` to your workspace. + You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**. + +3. To run the job, use the "run" command: + ``` + $ databricks bundle run pydabs_job_conditional_execution + ``` \ No newline at end of file diff --git a/knowledge_base/pydabs_job_conditional_execution/databricks.yml b/knowledge_base/pydabs_job_conditional_execution/databricks.yml new file mode 100644 index 0000000..55aa093 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/databricks.yml @@ -0,0 +1,21 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_conditional_execution + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true + workspace: + host: https://myworkspace.databricks.com \ No newline at end of file diff --git a/knowledge_base/pydabs_job_conditional_execution/pyproject.toml b/knowledge_base/pydabs_job_conditional_execution/pyproject.toml new file mode 100644 index 0000000..44bbd65 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_conditional_execution" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_conditional_execution/resources/__init__.py b/knowledge_base/pydabs_job_conditional_execution/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py b/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py new file mode 100644 index 0000000..7034b77 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py @@ -0,0 +1,36 @@ +from databricks.bundles.jobs import ( + ConditionTask, + Job, + NotebookTask, + Task, + TaskDependency, +) + +pydabs_job_conditional_execution = Job( + name="pydabs_job_conditional_execution", + tasks=[ + Task( + task_key="check_data_quality", + notebook_task=NotebookTask(notebook_path="src/check_quality.ipynb"), + ), + Task( + task_key="evaluate_quality", + condition_task=ConditionTask( + left="{{tasks.check_data_quality.values.bad_records}}", + op="GREATER_THAN", + right="100", + ), + depends_on=[TaskDependency(task_key="check_data_quality")], + ), + Task( + task_key="handle_bad_data", + notebook_task=NotebookTask(notebook_path="src/process_bad_data.ipynb"), + depends_on=[TaskDependency(task_key="evaluate_quality", outcome="true")], + ), + Task( + task_key="continue_pipeline", + notebook_task=NotebookTask(notebook_path="src/process_good_data.ipynb"), + depends_on=[TaskDependency(task_key="evaluate_quality", outcome="false")], + ), + ], +) diff --git a/knowledge_base/pydabs_job_conditional_execution/src/check_quality.ipynb b/knowledge_base/pydabs_job_conditional_execution/src/check_quality.ipynb new file mode 100644 index 0000000..4de9812 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/src/check_quality.ipynb @@ -0,0 +1,56 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "cdf58588", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "bad_records_count = 150" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "97594830", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "dbutils.jobs.taskValues.set(\n", + " key='bad_records',\n", + " value=bad_records_count\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f48c4bc8", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "print(f\"Found {bad_records_count} bad records\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/knowledge_base/pydabs_job_conditional_execution/src/process_bad_data.ipynb b/knowledge_base/pydabs_job_conditional_execution/src/process_bad_data.ipynb new file mode 100644 index 0000000..66e15fa --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/src/process_bad_data.ipynb @@ -0,0 +1,25 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "2ed3f14d", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "print(\"Processing Files...\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/knowledge_base/pydabs_job_conditional_execution/src/process_good_data.ipynb b/knowledge_base/pydabs_job_conditional_execution/src/process_good_data.ipynb new file mode 100644 index 0000000..66e15fa --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/src/process_good_data.ipynb @@ -0,0 +1,25 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "2ed3f14d", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "print(\"Processing Files...\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/knowledge_base/pydabs_job_file_arrival/README.md b/knowledge_base/pydabs_job_file_arrival/README.md new file mode 100644 index 0000000..c0d8db6 --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/README.md @@ -0,0 +1,74 @@ +# pydabs_job_file_arrival + +This example demonstrates a Lakeflow Job that uses file arrival triggers to automatically process new files when they arrive in a Unity Catalog Volume. + +The Lakeflow Job is configured with: +- **File arrival trigger**: Monitors a Unity Catalog Volume (root or subpath) for new files, recursively. +- **Configurable wait times**: + - Minimum time between triggers: 60 seconds + - Wait after last file change: 90 seconds (ensures file write is complete) +- **Automatic processing**: When files are detected, the job automatically runs and processes them + +* `src/`: Notebook source code for this project. + * `src/process_files.ipynb`: Processes newly arrived files from the volume path. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + * `resources/file_arrival.py`: PyDABs job with file arrival trigger configuration. + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, this project will deploy a job called + `[dev yourname] pydabs_job_file_arrival` to your workspace. + You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**. + +3. To configure the volume location: + - Edit `resources/file_arrival.py` and update the `url` parameter to point to your Unity Catalog Volume: + + ```python + url="/Volumes/your_catalog/your_schema/your_volume/" + ``` + +4. Development vs. Production behavior + - Dev target (mode: development): Schedules and automatic triggers are disabled by design, so the job will not auto-fire on file arrival. Use manual runs to test the logic. + You can also manually run it with: + + ``` + $ databricks bundle run pydabs_job_file_arrival + ``` + - Prod target (mode: production): Automatic triggers are active. Uploading a file to the configured Unity Catalog Volume path will trigger the job run when the trigger evaluates. + \ No newline at end of file diff --git a/knowledge_base/pydabs_job_file_arrival/databricks.yml b/knowledge_base/pydabs_job_file_arrival/databricks.yml new file mode 100644 index 0000000..80dff3c --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/databricks.yml @@ -0,0 +1,21 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_file_arrival + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true + workspace: + host: https://myworkspace.databricks.com \ No newline at end of file diff --git a/knowledge_base/pydabs_job_file_arrival/pyproject.toml b/knowledge_base/pydabs_job_file_arrival/pyproject.toml new file mode 100644 index 0000000..cf126d2 --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_file_arrival" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_file_arrival/resources/__init__.py b/knowledge_base/pydabs_job_file_arrival/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py b/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py new file mode 100644 index 0000000..022b5ce --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py @@ -0,0 +1,29 @@ +from databricks.bundles.jobs import ( + FileArrivalTriggerConfiguration, + Job, + NotebookTask, + Task, + TriggerSettings, +) + +pydabs_job_file_arrival = Job( + name="pydabs_job_file_arrival", + tasks=[ + Task( + task_key="process_new_files", + notebook_task=NotebookTask( + notebook_path="src/process_files.ipynb", + base_parameters={ + "file_arrival_location": "{{job.trigger.file_arrival.location}}" + }, + ), + ) + ], + trigger=TriggerSettings( + file_arrival=FileArrivalTriggerConfiguration( + url="/Volumes/your_catalog/your_schema/your_volume/", + min_time_between_triggers_seconds=60, + wait_after_last_change_seconds=90, + ), + ), +) diff --git a/knowledge_base/pydabs_job_file_arrival/src/process_files.ipynb b/knowledge_base/pydabs_job_file_arrival/src/process_files.ipynb new file mode 100644 index 0000000..ac29149 --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/src/process_files.ipynb @@ -0,0 +1,40 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "2ed3f14d", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "file_arrival_location = dbutils.widgets.get(\"file_arrival_location\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "17d7a5e3", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "print(\"Processing Files...\")\n", + "print(f\"File location: {file_arrival_location}\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/knowledge_base/pydabs_job_programmatic_generation/README.md b/knowledge_base/pydabs_job_programmatic_generation/README.md new file mode 100644 index 0000000..e124406 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/README.md @@ -0,0 +1,69 @@ +# pydabs_job_programmatic_generation + +This example demonstrates a simple Databricks job with programmatic generation and customization. + +* `src/`: Python source code for this project. + * `src/pydabs_job_programmatic_generation/`: Shared Python code that can be used by jobs and pipelines. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, the default template would deploy a job called + `[dev yourname] pydabs_airflow_job` to your workspace. + You can find that resource by opening your workpace and clicking on **Jobs & Pipelines**. + +3. Similarly, to deploy a production copy, type: + ``` + $ databricks bundle deploy --target prod + ``` + Note that the default job from the template has a schedule that runs every day + (defined in resources/sample_job.job.yml). The schedule + is paused when deploying in development mode (see + https://docs.databricks.com/dev-tools/bundles/deployment-modes.html). + +4. To run a job or pipeline, use the "run" command: + ``` + $ databricks bundle run + ``` + +5. Finally, to run tests locally, use `pytest`: + ``` + $ uv run pytest + ``` + diff --git a/knowledge_base/pydabs_job_programmatic_generation/databricks.yml b/knowledge_base/pydabs_job_programmatic_generation/databricks.yml new file mode 100644 index 0000000..263c202 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/databricks.yml @@ -0,0 +1,34 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_programmatic_generation + uuid: 3874a19c-7ea5-401d-bca2-9bd1f9d3efbf + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + mutators: + - 'mutators:add_email_notifications' + +include: + - resources/*.yml + - resources/*/*.yml + +resources: + sql_warehouses: + twoxs_warehouse: + name: Serverless Starter Warehouse + cluster_size: 2X-Small + auto_stop_mins: 10 + +targets: + dev: + mode: development + default: true + + prod: + mode: production + workspace: + root_path: /Workspace/Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} diff --git a/knowledge_base/pydabs_job_programmatic_generation/mutators.py b/knowledge_base/pydabs_job_programmatic_generation/mutators.py new file mode 100644 index 0000000..0ef6172 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/mutators.py @@ -0,0 +1,18 @@ +from dataclasses import replace + +from databricks.bundles.core import Bundle, job_mutator +from databricks.bundles.jobs import Job, JobEmailNotifications + + +@job_mutator +def add_email_notifications(bundle: Bundle, job: Job) -> Job: + if bundle.target == "dev": + return job + + email_notifications = JobEmailNotifications.from_dict( + { + "on_failure": ["${workspace.current_user.userName}"], + } + ) + + return replace(job, email_notifications=email_notifications) diff --git a/knowledge_base/pydabs_job_programmatic_generation/pyproject.toml b/knowledge_base/pydabs_job_programmatic_generation/pyproject.toml new file mode 100644 index 0000000..6bb9e9f --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_programmatic_generation" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py b/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py new file mode 100644 index 0000000..c6b6516 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py @@ -0,0 +1,58 @@ +import os +import glob + +from databricks.bundles.core import ( + Bundle, + Resources, +) +from databricks.bundles.schemas import Schema + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + + the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() + + Here a job is created for every notebook in the src folder + Plus a schema for the dev environment, to have one schema per user deploying the Job + """ + resources = Resources() + + target_schema_name = "target_prod_schema" # this is the schema name for prod - should be deployed with Terraform + + if bundle.target == "dev": + # create 1 schema per user in other environments + # note databricks.yml: the target dev is mode "development" + schema = Schema( + catalog_name="main", + name="prog_gen_target", + comment="Schema for output data", + ) + resources.add_schema(resource_name="project_schema", schema=schema) + target_schema_name = "${resources.schemas.project_schema.name}" + + for file in glob.glob("src/*.sql", recursive=True): + resources.add_job( + resource_name=os.path.basename(file).removesuffix(".sql"), + job={ + "name": file, + "tasks": [ + { + "task_key": "create_table", + "sql_task": { + "parameters": {"target_schema": target_schema_name}, + "file": { + "path": file, + }, + "warehouse_id": "${resources.sql_warehouses.twoxs_warehouse.id}", + }, + } + ], + }, + ) + + return resources diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/__init__.py b/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py b/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py new file mode 100644 index 0000000..cd9ac48 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py @@ -0,0 +1,6 @@ +def main(): + pass + + +if __name__ == "__main__": + main() diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query1.sql b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query1.sql new file mode 100644 index 0000000..7dfcf36 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query1.sql @@ -0,0 +1,42 @@ +USE CATALOG samples; +USE SCHEMA tpcds_sf1; + +CREATE OR REPLACE TABLE IDENTIFIER( + 'main.' || :target_schema || '.tpcds_query1' +) AS + WITH customer_total_return AS ( + SELECT + sr_customer_sk AS ctr_customer_sk, + sr_store_sk AS ctr_store_sk, + SUM(sr_return_amt) AS ctr_total_return + FROM + store_returns, + date_dim + WHERE + sr_returned_date_sk = d_date_sk + AND d_year = 2001 + GROUP BY + sr_customer_sk, + sr_store_sk + ) + SELECT + c_customer_id + FROM + customer_total_return ctr1, + store, + customer + WHERE + ctr1.ctr_total_return > ( + SELECT + AVG(ctr_total_return) * 1.2 + FROM + customer_total_return ctr2 + WHERE + ctr1.ctr_store_sk = ctr2.ctr_store_sk + ) + AND s_store_sk = ctr1.ctr_store_sk + AND s_state = 'TN' + AND ctr1.ctr_customer_sk = c_customer_sk + ORDER BY + c_customer_id + LIMIT 100; \ No newline at end of file diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query2.sql b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query2.sql new file mode 100644 index 0000000..b7b10ac --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query2.sql @@ -0,0 +1,72 @@ +USE CATALOG samples; +USE SCHEMA tpcds_sf1; + +CREATE OR REPLACE TABLE IDENTIFIER( + 'main.' || :target_schema || '.tpcds_query2' +) +TBLPROPERTIES ( + 'delta.columnMapping.mode' = 'name' +) AS +WITH wscs AS ( + SELECT sold_date_sk, sales_price + FROM ( + SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price + FROM web_sales + UNION ALL + SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price + FROM catalog_sales + ) +), +wswscs AS ( + SELECT + d_week_seq, + SUM(CASE WHEN d_day_name = 'Sunday' THEN sales_price ELSE NULL END) AS sun_sales, + SUM(CASE WHEN d_day_name = 'Monday' THEN sales_price ELSE NULL END) AS mon_sales, + SUM(CASE WHEN d_day_name = 'Tuesday' THEN sales_price ELSE NULL END) AS tue_sales, + SUM(CASE WHEN d_day_name = 'Wednesday' THEN sales_price ELSE NULL END) AS wed_sales, + SUM(CASE WHEN d_day_name = 'Thursday' THEN sales_price ELSE NULL END) AS thu_sales, + SUM(CASE WHEN d_day_name = 'Friday' THEN sales_price ELSE NULL END) AS fri_sales, + SUM(CASE WHEN d_day_name = 'Saturday' THEN sales_price ELSE NULL END) AS sat_sales + FROM wscs + JOIN date_dim ON d_date_sk = sold_date_sk + GROUP BY d_week_seq +) +SELECT + d_week_seq1, + ROUND(sun_sales1 / sun_sales2, 2), + ROUND(mon_sales1 / mon_sales2, 2), + ROUND(tue_sales1 / tue_sales2, 2), + ROUND(wed_sales1 / wed_sales2, 2), + ROUND(thu_sales1 / thu_sales2, 2), + ROUND(fri_sales1 / fri_sales2, 2), + ROUND(sat_sales1 / sat_sales2, 2) +FROM ( + SELECT + wswscs.d_week_seq AS d_week_seq1, + sun_sales AS sun_sales1, + mon_sales AS mon_sales1, + tue_sales AS tue_sales1, + wed_sales AS wed_sales1, + thu_sales AS thu_sales1, + fri_sales AS fri_sales1, + sat_sales AS sat_sales1 + FROM wswscs + JOIN date_dim ON date_dim.d_week_seq = wswscs.d_week_seq + WHERE d_year = 1998 +) y +JOIN ( + SELECT + wswscs.d_week_seq AS d_week_seq2, + sun_sales AS sun_sales2, + mon_sales AS mon_sales2, + tue_sales AS tue_sales2, + wed_sales AS wed_sales2, + thu_sales AS thu_sales2, + fri_sales AS fri_sales2, + sat_sales AS sat_sales2 + FROM wswscs + JOIN date_dim ON date_dim.d_week_seq = wswscs.d_week_seq + WHERE d_year = 1999 +) z +ON d_week_seq1 = d_week_seq2 - 53 +ORDER BY d_week_seq1; \ No newline at end of file diff --git a/knowledge_base/pydabs_job_with_for_each/README.md b/knowledge_base/pydabs_job_with_for_each/README.md new file mode 100644 index 0000000..474327b --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/README.md @@ -0,0 +1,69 @@ +# pydabs_job_with_for_each + +This example demonstrates a simple Databricks job that uses a foreach task. + +* `src/`: Python source code for this project. + * `src/pydabs_job_with_for_each/`: Shared Python code that can be used by jobs and pipelines. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, the default template would deploy a job called + `[dev yourname] pydabs_airflow_job` to your workspace. + You can find that resource by opening your workpace and clicking on **Jobs & Pipelines**. + +3. Similarly, to deploy a production copy, type: + ``` + $ databricks bundle deploy --target prod + ``` + Note that the default job from the template has a schedule that runs every day + (defined in resources/sample_job.job.yml). The schedule + is paused when deploying in development mode (see + https://docs.databricks.com/dev-tools/bundles/deployment-modes.html). + +4. To run a job or pipeline, use the "run" command: + ``` + $ databricks bundle run + ``` + +5. Finally, to run tests locally, use `pytest`: + ``` + $ uv run pytest + ``` + diff --git a/knowledge_base/pydabs_job_with_for_each/databricks.yml b/knowledge_base/pydabs_job_with_for_each/databricks.yml new file mode 100644 index 0000000..690fc32 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/databricks.yml @@ -0,0 +1,20 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_with_foreach + uuid: 3874a19c-7ea5-401d-bca2-9bd1f9d3efbf + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true \ No newline at end of file diff --git a/knowledge_base/pydabs_job_with_for_each/pyproject.toml b/knowledge_base/pydabs_job_with_for_each/pyproject.toml new file mode 100644 index 0000000..b6439ce --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_with_for_each" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_with_for_each/resources/__init__.py b/knowledge_base/pydabs_job_with_for_each/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py b/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py new file mode 100644 index 0000000..ea3b7b9 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py @@ -0,0 +1,38 @@ +from databricks.bundles.jobs import Job, Task, NotebookTask, ForEachTask, TaskDependency + +extract = Task( + task_key="extract", + notebook_task=NotebookTask(notebook_path="src/notebook_extract.py"), +) +process_item_iteration = Task( + task_key="process_item_iteration", + notebook_task=NotebookTask( + notebook_path="src/notebook_process_item.py", + base_parameters={ + "index": "{{input}}", + }, + ), +) +process_item = Task( + task_key="process_item", + depends_on=[TaskDependency(task_key="extract")], + for_each_task=ForEachTask( + inputs="{{tasks.extract.values.indexes}}", + task=process_item_iteration, + concurrency=10, + ), +) + +for_each_example = Job( + name="for_each_example", + tasks=[ + extract, + process_item, + ], + parameters=[ + { + "name": "lookup_file_name", + "default": "/Volumes/main/for_each_example/hotchpotch/my_file.json", + }, + ], +) diff --git a/knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py b/knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py new file mode 100644 index 0000000..ebda713 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py @@ -0,0 +1,18 @@ +# Databricks notebook source +lookup_file_name = dbutils.widgets.get("lookup_file_name") + +# COMMAND ---------- + +import json +from datetime import datetime, timedelta + +indexes = range(0, 10) +start_date = datetime.today() +data = [ + {"date": (start_date + timedelta(days=index)).strftime("%Y-%m-%d")} + for index in indexes +] +dbutils.fs.put(lookup_file_name, json.dumps(data), overwrite=True) +dbutils.jobs.taskValues.set("indexes", list(indexes)) + +# COMMAND ---------- diff --git a/knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py b/knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py new file mode 100644 index 0000000..ba439a8 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py @@ -0,0 +1,15 @@ +# Databricks notebook source +lookup_file_name = dbutils.widgets.get("lookup_file_name") +index = int(dbutils.widgets.get("index")) + +# COMMAND ---------- + +import json + +with open(lookup_file_name, "r") as f: + data = json.load(f) +date = data[index].get("date") + +print(date) + +# COMMAND ---------- diff --git a/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/__init__.py b/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py b/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py new file mode 100644 index 0000000..cd9ac48 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py @@ -0,0 +1,6 @@ +def main(): + pass + + +if __name__ == "__main__": + main() diff --git a/knowledge_base/pydabs_job_with_task_values/README.md b/knowledge_base/pydabs_job_with_task_values/README.md new file mode 100644 index 0000000..d2149f1 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/README.md @@ -0,0 +1,69 @@ +# pydabs_job_with_task_values + +This example demonstrates a simple Databricks job that uses tasks values to exchange info. + +* `src/`: Python source code for this project. + * `src/pydabs_job_with_task_values/`: Shared Python code that can be used by jobs and pipelines. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, the default template would deploy a job called + `[dev yourname] pydabs_airflow_job` to your workspace. + You can find that resource by opening your workpace and clicking on **Jobs & Pipelines**. + +3. Similarly, to deploy a production copy, type: + ``` + $ databricks bundle deploy --target prod + ``` + Note that the default job from the template has a schedule that runs every day + (defined in resources/sample_job.job.yml). The schedule + is paused when deploying in development mode (see + https://docs.databricks.com/dev-tools/bundles/deployment-modes.html). + +4. To run a job or pipeline, use the "run" command: + ``` + $ databricks bundle run + ``` + +5. Finally, to run tests locally, use `pytest`: + ``` + $ uv run pytest + ``` + diff --git a/knowledge_base/pydabs_job_with_task_values/databricks.yml b/knowledge_base/pydabs_job_with_task_values/databricks.yml new file mode 100644 index 0000000..82e52a5 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/databricks.yml @@ -0,0 +1,20 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_with_task_values + uuid: 3874a19c-7ea5-401d-bca2-9bd1f9d3efbf + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true \ No newline at end of file diff --git a/knowledge_base/pydabs_job_with_task_values/pyproject.toml b/knowledge_base/pydabs_job_with_task_values/pyproject.toml new file mode 100644 index 0000000..c280862 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_with_task_values" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_with_task_values/resources/__init__.py b/knowledge_base/pydabs_job_with_task_values/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py b/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py new file mode 100644 index 0000000..e47787d --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py @@ -0,0 +1,30 @@ +from databricks.bundles.jobs import Job, Task, NotebookTask, TaskDependency + +task_a = Task( + task_key="task_a", + notebook_task=NotebookTask(notebook_path="src/notebook_task_a.py"), +) +task_b = Task( + task_key="task_b", + depends_on=[TaskDependency(task_key="task_a")], + notebook_task=NotebookTask(notebook_path="src/notebook_task_b.py"), +) +task_c = Task( + task_key="task_c", + notebook_task=NotebookTask(notebook_path="src/notebook_task_c.py"), +) +task_d = Task( + task_key="task_d", + depends_on=[TaskDependency(task_key="task_c")], + notebook_task=NotebookTask(notebook_path="src/notebook_task_d.py"), +) + +task_values_simple = Job( + name="task_values_simple", + tasks=[ + task_a, + task_b, + task_c, + task_d, + ], +) diff --git a/knowledge_base/pydabs_job_with_task_values/src/notebook_task_a.py b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_a.py new file mode 100644 index 0000000..63e4e2e --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_a.py @@ -0,0 +1,3 @@ +# Databricks notebook source +val = [42, 12, 1812] +dbutils.jobs.taskValues.set(key="my_key", value=val) diff --git a/knowledge_base/pydabs_job_with_task_values/src/notebook_task_b.py b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_b.py new file mode 100644 index 0000000..e49eeb3 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_b.py @@ -0,0 +1,3 @@ +# Databricks notebook source +val = dbutils.jobs.taskValues.get(taskKey="task_a", key="my_key") +print(val) diff --git a/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/__init__.py b/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/main.py b/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/main.py new file mode 100644 index 0000000..cd9ac48 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/main.py @@ -0,0 +1,6 @@ +def main(): + pass + + +if __name__ == "__main__": + main()