Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions knowledge_base/pydabs_job_conditional_execution/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# pydabs_job_conditional_execution

This example demonstrates a Lakeflow Job that uses conditional task execution based on data quality checks.

The Lakeflow Job consists of following tasks:
1. Checks data quality and calculates bad records
2. Evaluates if bad records exceed a threshold (100 records)
3. Routes to different processing paths based on the condition:
- If bad records > 100: runs `handle_bad_data` task
- If bad records ≤ 100: runs `continue_pipeline` task

* `src/`: Notebook source code for this project.
* `src/check_quality.ipynb`: Checks data quality and outputs bad record count
* `src/process_bad_data.ipynb`: Handles cases with high bad record count
* `src/process_good_data.ipynb`: Continues normal pipeline for good data
* `resources/`: Resource configurations (jobs, pipelines, etc.)
* `resources/conditional_execution.py`: PyDABs job definition with conditional tasks


## Getting started

Choose how you want to work on this project:

(a) Directly in your Databricks workspace, see
https://docs.databricks.com/dev-tools/bundles/workspace.

(b) Locally with an IDE like Cursor or VS Code, see
https://docs.databricks.com/vscode-ext.

(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html

If you're developing with an IDE, dependencies for this project should be installed using uv:

* Make sure you have the UV package manager installed.
It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/.
* Run `uv sync --dev` to install the project's dependencies.


# Using this project using the CLI

The Databricks workspace and IDE extensions provide a graphical interface for working
with this project. It's also possible to interact with it directly using the CLI:

1. Authenticate to your Databricks workspace, if you have not done so already:
```
$ databricks configure
```

2. To deploy a development copy of this project, type:
```
$ databricks bundle deploy --target dev
```
(Note that "dev" is the default target, so the `--target` parameter
is optional here.)

This deploys everything that's defined for this project.
For example, this project will deploy a job called
`[dev yourname] pydabs_job_conditional_execution` to your workspace.
You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**.

3. To run the job, use the "run" command:
```
$ databricks bundle run pydabs_job_conditional_execution
```
21 changes: 21 additions & 0 deletions knowledge_base/pydabs_job_conditional_execution/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# This is a Databricks asset bundle definition for pydabs_airflow.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: pydabs_job_conditional_execution

python:
venv_path: .venv
# Functions called to load resources defined in Python. See resources/__init__.py
resources:
- "resources:load_resources"

include:
- resources/*.yml
- resources/*/*.yml

targets:
dev:
mode: development
default: true
workspace:
host: https://myworkspace.databricks.com
26 changes: 26 additions & 0 deletions knowledge_base/pydabs_job_conditional_execution/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[project]
name = "pydabs_job_conditional_execution"
version = "0.0.1"
authors = [{ name = "Databricks Field Engineering" }]
requires-python = ">=3.10,<=3.13"
dependencies = [
# Any dependencies for jobs and pipelines in this project can be added here
# See also https://docs.databricks.com/dev-tools/bundles/library-dependencies
#
# LIMITATION: for pipelines, dependencies are cached during development;
# add dependencies to the 'environment' section of pipeline.yml file instead
]

[dependency-groups]
dev = [
"pytest",
"databricks-connect>=15.4,<15.5",
"databricks-bundles==0.275.0",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.black]
line-length = 125
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from databricks.bundles.core import (
Bundle,
Resources,
load_resources_from_current_package_module,
)


def load_resources(bundle: Bundle) -> Resources:
"""
'load_resources' function is referenced in databricks.yml and is responsible for loading
bundle resources defined in Python code. This function is called by Databricks CLI during
bundle deployment. After deployment, this function is not used.
"""

# the default implementation loads all Python files in 'resources' directory
return load_resources_from_current_package_module()
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from databricks.bundles.jobs import (
ConditionTask,
Job,
NotebookTask,
Task,
TaskDependency,
)

pydabs_job_conditional_execution = Job(
name="pydabs_job_conditional_execution",
tasks=[
Task(
task_key="check_data_quality",
notebook_task=NotebookTask(notebook_path="src/check_quality.ipynb"),
),
Task(
task_key="evaluate_quality",
condition_task=ConditionTask(
left="{{tasks.check_data_quality.values.bad_records}}",
op="GREATER_THAN",
right="100",
),
depends_on=[TaskDependency(task_key="check_data_quality")],
),
Task(
task_key="handle_bad_data",
notebook_task=NotebookTask(notebook_path="src/process_bad_data.ipynb"),
depends_on=[TaskDependency(task_key="evaluate_quality", outcome="true")],
),
Task(
task_key="continue_pipeline",
notebook_task=NotebookTask(notebook_path="src/process_good_data.ipynb"),
depends_on=[TaskDependency(task_key="evaluate_quality", outcome="false")],
),
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "cdf58588",
"metadata": {
"vscode": {
"languageId": "plaintext"
}
},
"outputs": [],
"source": [
"bad_records_count = 150"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "97594830",
"metadata": {
"vscode": {
"languageId": "plaintext"
}
},
"outputs": [],
"source": [
"dbutils.jobs.taskValues.set(\n",
" key='bad_records',\n",
" value=bad_records_count\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f48c4bc8",
"metadata": {
"vscode": {
"languageId": "plaintext"
}
},
"outputs": [],
"source": [
"print(f\"Found {bad_records_count} bad records\")"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "2ed3f14d",
"metadata": {
"vscode": {
"languageId": "plaintext"
}
},
"outputs": [],
"source": [
"print(\"Processing Files...\")"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "2ed3f14d",
"metadata": {
"vscode": {
"languageId": "plaintext"
}
},
"outputs": [],
"source": [
"print(\"Processing Files...\")"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
74 changes: 74 additions & 0 deletions knowledge_base/pydabs_job_file_arrival/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# pydabs_job_file_arrival

This example demonstrates a Lakeflow Job that uses file arrival triggers to automatically process new files when they arrive in a Unity Catalog Volume.

The Lakeflow Job is configured with:
- **File arrival trigger**: Monitors a Unity Catalog Volume (root or subpath) for new files, recursively.
- **Configurable wait times**:
- Minimum time between triggers: 60 seconds
- Wait after last file change: 90 seconds (ensures file write is complete)
- **Automatic processing**: When files are detected, the job automatically runs and processes them

* `src/`: Notebook source code for this project.
* `src/process_files.ipynb`: Processes newly arrived files from the volume path.
* `resources/`: Resource configurations (jobs, pipelines, etc.)
* `resources/file_arrival.py`: PyDABs job with file arrival trigger configuration.


## Getting started

Choose how you want to work on this project:

(a) Directly in your Databricks workspace, see
https://docs.databricks.com/dev-tools/bundles/workspace.

(b) Locally with an IDE like Cursor or VS Code, see
https://docs.databricks.com/vscode-ext.

(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html

If you're developing with an IDE, dependencies for this project should be installed using uv:

* Make sure you have the UV package manager installed.
It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/.
* Run `uv sync --dev` to install the project's dependencies.


# Using this project using the CLI

The Databricks workspace and IDE extensions provide a graphical interface for working
with this project. It's also possible to interact with it directly using the CLI:

1. Authenticate to your Databricks workspace, if you have not done so already:
```
$ databricks configure
```

2. To deploy a development copy of this project, type:
```
$ databricks bundle deploy --target dev
```
(Note that "dev" is the default target, so the `--target` parameter
is optional here.)

This deploys everything that's defined for this project.
For example, this project will deploy a job called
`[dev yourname] pydabs_job_file_arrival` to your workspace.
You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**.

3. To configure the volume location:
- Edit `resources/file_arrival.py` and update the `url` parameter to point to your Unity Catalog Volume:

```python
url="/Volumes/your_catalog/your_schema/your_volume/"
```

4. Development vs. Production behavior
- Dev target (mode: development): Schedules and automatic triggers are disabled by design, so the job will not auto-fire on file arrival. Use manual runs to test the logic.
You can also manually run it with:

```
$ databricks bundle run pydabs_job_file_arrival
```
- Prod target (mode: production): Automatic triggers are active. Uploading a file to the configured Unity Catalog Volume path will trigger the job run when the trigger evaluates.

21 changes: 21 additions & 0 deletions knowledge_base/pydabs_job_file_arrival/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# This is a Databricks asset bundle definition for pydabs_airflow.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: pydabs_job_file_arrival

python:
venv_path: .venv
# Functions called to load resources defined in Python. See resources/__init__.py
resources:
- "resources:load_resources"

include:
- resources/*.yml
- resources/*/*.yml

targets:
dev:
mode: development
default: true
workspace:
host: https://myworkspace.databricks.com
Loading