From 1f27f62bf0da8431617bbdbf1f4693d95d92cf14 Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio Date: Mon, 17 Nov 2025 20:29:54 +0100 Subject: [PATCH 1/7] pydabs for job with task values and job with for each --- .../pydabs_job_with_for_each/README.md | 69 +++++++++++++++++++ .../pydabs_job_with_for_each/databricks.yml | 20 ++++++ .../pydabs_job_with_for_each/pyproject.toml | 26 +++++++ .../resources/__init__.py | 16 +++++ .../resources/for_each_simple.py | 38 ++++++++++ .../src/notebook_extract.py | 16 +++++ .../src/notebook_process_item.py | 16 +++++ .../src/pydabs_job_with_for_each/__init__.py | 0 .../src/pydabs_job_with_for_each/main.py | 5 ++ .../pydabs_job_with_task_values/README.md | 69 +++++++++++++++++++ .../databricks.yml | 20 ++++++ .../pyproject.toml | 26 +++++++ .../resources/__init__.py | 16 +++++ .../resources/task_values_simple.py | 19 +++++ .../src/notebook_task_a.py | 3 + .../src/notebook_task_b.py | 3 + .../pydabs_job_with_task_values/__init__.py | 0 .../src/pydabs_job_with_task_values/main.py | 5 ++ 18 files changed, 367 insertions(+) create mode 100644 knowledge_base/pydabs_job_with_for_each/README.md create mode 100644 knowledge_base/pydabs_job_with_for_each/databricks.yml create mode 100644 knowledge_base/pydabs_job_with_for_each/pyproject.toml create mode 100644 knowledge_base/pydabs_job_with_for_each/resources/__init__.py create mode 100644 knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py create mode 100644 knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py create mode 100644 knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py create mode 100644 knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/__init__.py create mode 100644 knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py create mode 100644 knowledge_base/pydabs_job_with_task_values/README.md create mode 100644 knowledge_base/pydabs_job_with_task_values/databricks.yml create mode 100644 knowledge_base/pydabs_job_with_task_values/pyproject.toml create mode 100644 knowledge_base/pydabs_job_with_task_values/resources/__init__.py create mode 100644 knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py create mode 100644 knowledge_base/pydabs_job_with_task_values/src/notebook_task_a.py create mode 100644 knowledge_base/pydabs_job_with_task_values/src/notebook_task_b.py create mode 100644 knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/__init__.py create mode 100644 knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/main.py diff --git a/knowledge_base/pydabs_job_with_for_each/README.md b/knowledge_base/pydabs_job_with_for_each/README.md new file mode 100644 index 0000000..474327b --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/README.md @@ -0,0 +1,69 @@ +# pydabs_job_with_for_each + +This example demonstrates a simple Databricks job that uses a foreach task. + +* `src/`: Python source code for this project. + * `src/pydabs_job_with_for_each/`: Shared Python code that can be used by jobs and pipelines. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, the default template would deploy a job called + `[dev yourname] pydabs_airflow_job` to your workspace. + You can find that resource by opening your workpace and clicking on **Jobs & Pipelines**. + +3. Similarly, to deploy a production copy, type: + ``` + $ databricks bundle deploy --target prod + ``` + Note that the default job from the template has a schedule that runs every day + (defined in resources/sample_job.job.yml). The schedule + is paused when deploying in development mode (see + https://docs.databricks.com/dev-tools/bundles/deployment-modes.html). + +4. To run a job or pipeline, use the "run" command: + ``` + $ databricks bundle run + ``` + +5. Finally, to run tests locally, use `pytest`: + ``` + $ uv run pytest + ``` + diff --git a/knowledge_base/pydabs_job_with_for_each/databricks.yml b/knowledge_base/pydabs_job_with_for_each/databricks.yml new file mode 100644 index 0000000..690fc32 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/databricks.yml @@ -0,0 +1,20 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_with_foreach + uuid: 3874a19c-7ea5-401d-bca2-9bd1f9d3efbf + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true \ No newline at end of file diff --git a/knowledge_base/pydabs_job_with_for_each/pyproject.toml b/knowledge_base/pydabs_job_with_for_each/pyproject.toml new file mode 100644 index 0000000..b6439ce --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_with_for_each" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_with_for_each/resources/__init__.py b/knowledge_base/pydabs_job_with_for_each/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py b/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py new file mode 100644 index 0000000..fad4215 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py @@ -0,0 +1,38 @@ +from databricks.bundles.jobs import Job, Task, NotebookTask, ForEachTask, TaskDependency + +extract = Task( + task_key="extract", + notebook_task=NotebookTask(notebook_path="src/notebook_extract.py"), +) +process_item_iteration = Task( + task_key="process_item_iteration", + notebook_task=NotebookTask( + notebook_path="src/notebook_process_item.py", + base_parameters={ + "index": "{{input}}", + }, + ), +) +process_item = Task( + task_key='process_item', + depends_on=[TaskDependency(task_key="extract")], + for_each_task=ForEachTask( + inputs='{{tasks.extract.values.indexes}}', + task=process_item_iteration, + concurrency=10 + ) +) + +for_each_example = Job( + name="for_each_example", + tasks=[ + extract, + process_item, + ], + parameters=[ + { + "name": "lookup_file_name", + "default": "/Volumes/main/for_each_example/hotchpotch/my_file.json", + }, + ], +) diff --git a/knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py b/knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py new file mode 100644 index 0000000..16c8b1e --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py @@ -0,0 +1,16 @@ +# Databricks notebook source +lookup_file_name = dbutils.widgets.get('lookup_file_name') + +# COMMAND ---------- + +import json +from datetime import datetime, timedelta + +indexes = range(0,10) +start_date = datetime.today() +data = [{"date": (start_date + timedelta(days=index)).strftime("%Y-%m-%d")} for index in indexes] +dbutils.fs.put(lookup_file_name, json.dumps(data), overwrite=True) +dbutils.jobs.taskValues.set("indexes", list(indexes)) + +# COMMAND ---------- + diff --git a/knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py b/knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py new file mode 100644 index 0000000..a878dcc --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py @@ -0,0 +1,16 @@ +# Databricks notebook source +lookup_file_name = dbutils.widgets.get('lookup_file_name') +index = int(dbutils.widgets.get('index')) + +# COMMAND ---------- + +import json + +with open(lookup_file_name, "r") as f: + data = json.load(f) +date = data[index].get("date") + +print(date) + +# COMMAND ---------- + diff --git a/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/__init__.py b/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py b/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py new file mode 100644 index 0000000..3339ef2 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py @@ -0,0 +1,5 @@ +def main(): + pass + +if __name__ == "__main__": + main() diff --git a/knowledge_base/pydabs_job_with_task_values/README.md b/knowledge_base/pydabs_job_with_task_values/README.md new file mode 100644 index 0000000..d2149f1 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/README.md @@ -0,0 +1,69 @@ +# pydabs_job_with_task_values + +This example demonstrates a simple Databricks job that uses tasks values to exchange info. + +* `src/`: Python source code for this project. + * `src/pydabs_job_with_task_values/`: Shared Python code that can be used by jobs and pipelines. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, the default template would deploy a job called + `[dev yourname] pydabs_airflow_job` to your workspace. + You can find that resource by opening your workpace and clicking on **Jobs & Pipelines**. + +3. Similarly, to deploy a production copy, type: + ``` + $ databricks bundle deploy --target prod + ``` + Note that the default job from the template has a schedule that runs every day + (defined in resources/sample_job.job.yml). The schedule + is paused when deploying in development mode (see + https://docs.databricks.com/dev-tools/bundles/deployment-modes.html). + +4. To run a job or pipeline, use the "run" command: + ``` + $ databricks bundle run + ``` + +5. Finally, to run tests locally, use `pytest`: + ``` + $ uv run pytest + ``` + diff --git a/knowledge_base/pydabs_job_with_task_values/databricks.yml b/knowledge_base/pydabs_job_with_task_values/databricks.yml new file mode 100644 index 0000000..82e52a5 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/databricks.yml @@ -0,0 +1,20 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_with_task_values + uuid: 3874a19c-7ea5-401d-bca2-9bd1f9d3efbf + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true \ No newline at end of file diff --git a/knowledge_base/pydabs_job_with_task_values/pyproject.toml b/knowledge_base/pydabs_job_with_task_values/pyproject.toml new file mode 100644 index 0000000..c280862 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_with_task_values" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_with_task_values/resources/__init__.py b/knowledge_base/pydabs_job_with_task_values/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py b/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py new file mode 100644 index 0000000..45e91f4 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py @@ -0,0 +1,19 @@ +from databricks.bundles.jobs import Job, Task, NotebookTask, TaskDependency + +task_a = Task( + task_key="task_a", + notebook_task=NotebookTask(notebook_path="src/notebook_task_a.py"), +) +task_b = Task( + task_key="task_b", + depends_on=[TaskDependency(task_key="task_a")], + notebook_task=NotebookTask(notebook_path="src/notebook_task_b.py"), +) + +task_values_simple=Job( + name="task_values_simple", + tasks=[ + task_a, + task_b, + ], +) diff --git a/knowledge_base/pydabs_job_with_task_values/src/notebook_task_a.py b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_a.py new file mode 100644 index 0000000..c3715d0 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_a.py @@ -0,0 +1,3 @@ +# Databricks notebook source +val = [42, 12, 1812] +dbutils.jobs.taskValues.set(key="my_key", value=val) \ No newline at end of file diff --git a/knowledge_base/pydabs_job_with_task_values/src/notebook_task_b.py b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_b.py new file mode 100644 index 0000000..578de83 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_b.py @@ -0,0 +1,3 @@ +# Databricks notebook source +val = dbutils.jobs.taskValues.get(taskKey="task_a", key="my_key") +print(val) \ No newline at end of file diff --git a/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/__init__.py b/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/main.py b/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/main.py new file mode 100644 index 0000000..3339ef2 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/main.py @@ -0,0 +1,5 @@ +def main(): + pass + +if __name__ == "__main__": + main() From 46422feaf8638151b32580593bc0fbf2e6320a6e Mon Sep 17 00:00:00 2001 From: Zanita Rahimi Date: Tue, 18 Nov 2025 13:29:50 +0100 Subject: [PATCH 2/7] adding conditional execution and file arrival scripts --- .../README.md | 64 ++++++++++++++++ .../databricks.yml | 21 +++++ .../pyproject.toml | 26 +++++++ .../resources/__init__.py | 16 ++++ .../resources/conditional_execution.py | 54 +++++++++++++ .../src/check_quality.ipynb | 56 ++++++++++++++ .../src/process_bad_data.ipynb | 25 ++++++ .../src/process_good_data.ipynb | 25 ++++++ .../pydabs_job_file_arrival/README.md | 76 +++++++++++++++++++ .../pydabs_job_file_arrival/databricks.yml | 21 +++++ .../pydabs_job_file_arrival/pyproject.toml | 26 +++++++ .../resources/__init__.py | 16 ++++ .../resources/file_arrival.py | 27 +++++++ .../src/process_files.ipynb | 40 ++++++++++ 14 files changed, 493 insertions(+) create mode 100644 knowledge_base/pydabs_job_conditional_execution/README.md create mode 100644 knowledge_base/pydabs_job_conditional_execution/databricks.yml create mode 100644 knowledge_base/pydabs_job_conditional_execution/pyproject.toml create mode 100644 knowledge_base/pydabs_job_conditional_execution/resources/__init__.py create mode 100644 knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py create mode 100644 knowledge_base/pydabs_job_conditional_execution/src/check_quality.ipynb create mode 100644 knowledge_base/pydabs_job_conditional_execution/src/process_bad_data.ipynb create mode 100644 knowledge_base/pydabs_job_conditional_execution/src/process_good_data.ipynb create mode 100644 knowledge_base/pydabs_job_file_arrival/README.md create mode 100644 knowledge_base/pydabs_job_file_arrival/databricks.yml create mode 100644 knowledge_base/pydabs_job_file_arrival/pyproject.toml create mode 100644 knowledge_base/pydabs_job_file_arrival/resources/__init__.py create mode 100644 knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py create mode 100644 knowledge_base/pydabs_job_file_arrival/src/process_files.ipynb diff --git a/knowledge_base/pydabs_job_conditional_execution/README.md b/knowledge_base/pydabs_job_conditional_execution/README.md new file mode 100644 index 0000000..c10e4a4 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/README.md @@ -0,0 +1,64 @@ +# pydabs_job_conditional_execution + +This example demonstrates a Lakeflow Job that uses conditional task execution based on data quality checks. + +The Lakeflow Job consists of following tasks: +1. Checks data quality and calculates bad records +2. Evaluates if bad records exceed a threshold (100 records) +3. Routes to different processing paths based on the condition: + - If bad records > 100: runs `handle_bad_data` task + - If bad records ≤ 100: runs `continue_pipeline` task + +* `src/`: Notebook source code for this project. + * `src/check_quality.ipynb`: Checks data quality and outputs bad record count + * `src/process_bad_data.ipynb`: Handles cases with high bad record count + * `src/process_good_data.ipynb`: Continues normal pipeline for good data +* `resources/`: Resource configurations (jobs, pipelines, etc.) + * `resources/conditional_execution.py`: PyDABs job definition with conditional tasks + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, this project will deploy a job called + `[dev yourname] pydabs_job_conditional_execution` to your workspace. + You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**. + +3. To run the job, use the "run" command: + ``` + $ databricks bundle run pydabs_job_conditional_execution + ``` \ No newline at end of file diff --git a/knowledge_base/pydabs_job_conditional_execution/databricks.yml b/knowledge_base/pydabs_job_conditional_execution/databricks.yml new file mode 100644 index 0000000..55aa093 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/databricks.yml @@ -0,0 +1,21 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_conditional_execution + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true + workspace: + host: https://myworkspace.databricks.com \ No newline at end of file diff --git a/knowledge_base/pydabs_job_conditional_execution/pyproject.toml b/knowledge_base/pydabs_job_conditional_execution/pyproject.toml new file mode 100644 index 0000000..44bbd65 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_conditional_execution" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_conditional_execution/resources/__init__.py b/knowledge_base/pydabs_job_conditional_execution/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py b/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py new file mode 100644 index 0000000..f4242ec --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py @@ -0,0 +1,54 @@ +from databricks.bundles.jobs import ( + ConditionTask, + Job, + NotebookTask, + Task, + TaskDependency, +) + +pydabs_job_conditional_execution = Job( + name="pydabs_job_conditional_execution", + tasks=[ + Task( + task_key="check_data_quality", + notebook_task=NotebookTask( + notebook_path="src/check_quality.ipynb" + ), + ), + Task( + task_key="evaluate_quality", + condition_task=ConditionTask( + left="{{tasks.check_data_quality.values.bad_records}}", + op="GREATER_THAN", + right="100" + ), + depends_on=[ + TaskDependency(task_key="check_data_quality") + ], + ), + Task( + task_key="handle_bad_data", + notebook_task=NotebookTask( + notebook_path="src/process_bad_data.ipynb" + ), + depends_on=[ + TaskDependency( + task_key="evaluate_quality", + outcome="true" + ) + ], + ), + Task( + task_key="continue_pipeline", + notebook_task=NotebookTask( + notebook_path="src/process_good_data.ipynb" + ), + depends_on=[ + TaskDependency( + task_key="evaluate_quality", + outcome="false" + ) + ], + ) + ] +) diff --git a/knowledge_base/pydabs_job_conditional_execution/src/check_quality.ipynb b/knowledge_base/pydabs_job_conditional_execution/src/check_quality.ipynb new file mode 100644 index 0000000..4de9812 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/src/check_quality.ipynb @@ -0,0 +1,56 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "cdf58588", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "bad_records_count = 150" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "97594830", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "dbutils.jobs.taskValues.set(\n", + " key='bad_records',\n", + " value=bad_records_count\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f48c4bc8", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "print(f\"Found {bad_records_count} bad records\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/knowledge_base/pydabs_job_conditional_execution/src/process_bad_data.ipynb b/knowledge_base/pydabs_job_conditional_execution/src/process_bad_data.ipynb new file mode 100644 index 0000000..66e15fa --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/src/process_bad_data.ipynb @@ -0,0 +1,25 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "2ed3f14d", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "print(\"Processing Files...\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/knowledge_base/pydabs_job_conditional_execution/src/process_good_data.ipynb b/knowledge_base/pydabs_job_conditional_execution/src/process_good_data.ipynb new file mode 100644 index 0000000..66e15fa --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/src/process_good_data.ipynb @@ -0,0 +1,25 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "2ed3f14d", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "print(\"Processing Files...\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/knowledge_base/pydabs_job_file_arrival/README.md b/knowledge_base/pydabs_job_file_arrival/README.md new file mode 100644 index 0000000..8ce71fe --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/README.md @@ -0,0 +1,76 @@ +# pydabs_job_file_arrival + +This example demonstrates a Lakeflow Job that uses file arrival triggers to automatically process new files when they arrive in a Unity Catalog Volume. + +The Lakeflow Job is configured with: +- **File arrival trigger**: Monitors a Unity Catalog Volume (root or subpath) for new files, recursively. +- **Configurable wait times**: + - Minimum time between triggers: 60 seconds + - Wait after last file change: 90 seconds (ensures file write is complete) +- **Automatic processing**: When files are detected, the job automatically runs and processes them + +* `src/`: Notebook source code for this project. + * `src/process_files.ipynb`: Processes newly arrived files from the volume path. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + * `resources/file_arrival.py`: PyDABs job with file arrival trigger configuration. + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +## Development vs. Production behavior + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, this project will deploy a job called + `[dev yourname] pydabs_job_file_arrival` to your workspace. + You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**. + +3. To configure the volume location: + - Edit `resources/file_arrival.py` and update the `url` parameter to point to your Unity Catalog Volume: + + ```python + url="/Volumes/your_catalog/your_schema/your_volume/" + ``` + +4. Development vs. Production behavior + - Dev target (mode: development): Schedules and automatic triggers are disabled by design, so the job will not auto-fire on file arrival. Use manual runs to test the logic. + You can also manually run it with: + + ``` + $ databricks bundle run pydabs_job_file_arrival + ``` + - Prod target (mode: production): Automatic triggers are active. Uploading a file to the configured Unity Catalog Volume path will trigger the job run when the trigger evaluates. + \ No newline at end of file diff --git a/knowledge_base/pydabs_job_file_arrival/databricks.yml b/knowledge_base/pydabs_job_file_arrival/databricks.yml new file mode 100644 index 0000000..80dff3c --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/databricks.yml @@ -0,0 +1,21 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_file_arrival + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true + workspace: + host: https://myworkspace.databricks.com \ No newline at end of file diff --git a/knowledge_base/pydabs_job_file_arrival/pyproject.toml b/knowledge_base/pydabs_job_file_arrival/pyproject.toml new file mode 100644 index 0000000..cf126d2 --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_file_arrival" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_file_arrival/resources/__init__.py b/knowledge_base/pydabs_job_file_arrival/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py b/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py new file mode 100644 index 0000000..58ddb45 --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py @@ -0,0 +1,27 @@ +from databricks.bundles.jobs import ( + FileArrivalTriggerConfiguration, + Job, + NotebookTask, + Task, + TriggerSettings, +) + +pydabs_job_file_arrival = Job( + name="pydabs_job_file_arrival", + tasks=[ + Task( + task_key="process_new_files", + notebook_task=NotebookTask( + notebook_path="src/process_files.ipynb", + base_parameters={"file_arrival_location": "{{job.trigger.file_arrival.location}}"}, + ), + ) + ], + trigger=TriggerSettings( + file_arrival=FileArrivalTriggerConfiguration( + url="/Volumes/your_catalog/your_schema/your_volume/", + min_time_between_triggers_seconds=60, + wait_after_last_change_seconds=90, + ), + ), +) \ No newline at end of file diff --git a/knowledge_base/pydabs_job_file_arrival/src/process_files.ipynb b/knowledge_base/pydabs_job_file_arrival/src/process_files.ipynb new file mode 100644 index 0000000..ac29149 --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/src/process_files.ipynb @@ -0,0 +1,40 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "2ed3f14d", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "file_arrival_location = dbutils.widgets.get(\"file_arrival_location\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "17d7a5e3", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "print(\"Processing Files...\")\n", + "print(f\"File location: {file_arrival_location}\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From f841d1ce19a1e09b20fdd2e9351bd3eaa5f921d7 Mon Sep 17 00:00:00 2001 From: Zanita Rahimi Date: Tue, 18 Nov 2025 13:34:39 +0100 Subject: [PATCH 3/7] adding conditional execution and file arrival scripts --- knowledge_base/pydabs_job_file_arrival/README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/knowledge_base/pydabs_job_file_arrival/README.md b/knowledge_base/pydabs_job_file_arrival/README.md index 8ce71fe..c0d8db6 100644 --- a/knowledge_base/pydabs_job_file_arrival/README.md +++ b/knowledge_base/pydabs_job_file_arrival/README.md @@ -36,8 +36,6 @@ If you're developing with an IDE, dependencies for this project should be instal # Using this project using the CLI -## Development vs. Production behavior - The Databricks workspace and IDE extensions provide a graphical interface for working with this project. It's also possible to interact with it directly using the CLI: From 0d789ab31437f26ca825b1574166b77ca98f73af Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio Date: Fri, 21 Nov 2025 19:31:58 +0100 Subject: [PATCH 4/7] pydabs programmatic generation -initial version --- .../README.md | 69 +++++++++++++++++++ .../databricks.yml | 22 ++++++ .../mutators.py | 21 ++++++ .../pyproject.toml | 26 +++++++ .../resources/__init__.py | 40 +++++++++++ .../src/notebook_extract.py | 16 +++++ .../src/notebook_process_item.py | 16 +++++ .../__init__.py | 0 .../main.py | 5 ++ 9 files changed, 215 insertions(+) create mode 100644 knowledge_base/pydabs_job_programmatic_generation/README.md create mode 100644 knowledge_base/pydabs_job_programmatic_generation/databricks.yml create mode 100644 knowledge_base/pydabs_job_programmatic_generation/mutators.py create mode 100644 knowledge_base/pydabs_job_programmatic_generation/pyproject.toml create mode 100644 knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py create mode 100644 knowledge_base/pydabs_job_programmatic_generation/src/notebook_extract.py create mode 100644 knowledge_base/pydabs_job_programmatic_generation/src/notebook_process_item.py create mode 100644 knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/__init__.py create mode 100644 knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py diff --git a/knowledge_base/pydabs_job_programmatic_generation/README.md b/knowledge_base/pydabs_job_programmatic_generation/README.md new file mode 100644 index 0000000..7e45b14 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/README.md @@ -0,0 +1,69 @@ +# pydabs_job_with_for_each + +This example demonstrates a simple Databricks job with programmatic generation and customization. + +* `src/`: Python source code for this project. + * `src/pydabs_job_programmatic_generation/`: Shared Python code that can be used by jobs and pipelines. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, the default template would deploy a job called + `[dev yourname] pydabs_airflow_job` to your workspace. + You can find that resource by opening your workpace and clicking on **Jobs & Pipelines**. + +3. Similarly, to deploy a production copy, type: + ``` + $ databricks bundle deploy --target prod + ``` + Note that the default job from the template has a schedule that runs every day + (defined in resources/sample_job.job.yml). The schedule + is paused when deploying in development mode (see + https://docs.databricks.com/dev-tools/bundles/deployment-modes.html). + +4. To run a job or pipeline, use the "run" command: + ``` + $ databricks bundle run + ``` + +5. Finally, to run tests locally, use `pytest`: + ``` + $ uv run pytest + ``` + diff --git a/knowledge_base/pydabs_job_programmatic_generation/databricks.yml b/knowledge_base/pydabs_job_programmatic_generation/databricks.yml new file mode 100644 index 0000000..a4f07de --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/databricks.yml @@ -0,0 +1,22 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_programmatic_generation + uuid: 3874a19c-7ea5-401d-bca2-9bd1f9d3efbf + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + mutators: + - 'mutators:add_email_notifications' + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true \ No newline at end of file diff --git a/knowledge_base/pydabs_job_programmatic_generation/mutators.py b/knowledge_base/pydabs_job_programmatic_generation/mutators.py new file mode 100644 index 0000000..201b6f4 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/mutators.py @@ -0,0 +1,21 @@ +from dataclasses import replace + + +from databricks.bundles.core import Bundle, job_mutator, m +from databricks.bundles.jobs import Job, JobEmailNotifications + + +@job_mutator +def add_email_notifications(bundle: Bundle, job: Job) -> Job: + if job.email_notifications: + return job + + + email_notifications = JobEmailNotifications.from_dict( + { + "on_failure": ["${workspace.current_user.userName}"], + } + ) + + + return replace(job, email_notifications=email_notifications) diff --git a/knowledge_base/pydabs_job_programmatic_generation/pyproject.toml b/knowledge_base/pydabs_job_programmatic_generation/pyproject.toml new file mode 100644 index 0000000..6bb9e9f --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_programmatic_generation" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py b/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py new file mode 100644 index 0000000..d72e7d2 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py @@ -0,0 +1,40 @@ +import os +import glob + +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + # return load_resources_from_current_package_module() + + """ + load_resources() is called during bundle deployment + Here a job is created for every notebook in the src folder + """ + resources = Resources() + for file in glob.glob("src/notebook*.py", recursive=True): + resources.add_job( + resource_name=os.path.basename(file).removesuffix(".py"), + job={ + "name": file, + "tasks": [ + { + "task_key": "notebook_task", + "notebook_task": {"notebook_path": file}, + }, + ], + }, + ) + + return resources diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/notebook_extract.py b/knowledge_base/pydabs_job_programmatic_generation/src/notebook_extract.py new file mode 100644 index 0000000..16c8b1e --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/src/notebook_extract.py @@ -0,0 +1,16 @@ +# Databricks notebook source +lookup_file_name = dbutils.widgets.get('lookup_file_name') + +# COMMAND ---------- + +import json +from datetime import datetime, timedelta + +indexes = range(0,10) +start_date = datetime.today() +data = [{"date": (start_date + timedelta(days=index)).strftime("%Y-%m-%d")} for index in indexes] +dbutils.fs.put(lookup_file_name, json.dumps(data), overwrite=True) +dbutils.jobs.taskValues.set("indexes", list(indexes)) + +# COMMAND ---------- + diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/notebook_process_item.py b/knowledge_base/pydabs_job_programmatic_generation/src/notebook_process_item.py new file mode 100644 index 0000000..a878dcc --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/src/notebook_process_item.py @@ -0,0 +1,16 @@ +# Databricks notebook source +lookup_file_name = dbutils.widgets.get('lookup_file_name') +index = int(dbutils.widgets.get('index')) + +# COMMAND ---------- + +import json + +with open(lookup_file_name, "r") as f: + data = json.load(f) +date = data[index].get("date") + +print(date) + +# COMMAND ---------- + diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/__init__.py b/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py b/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py new file mode 100644 index 0000000..3339ef2 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py @@ -0,0 +1,5 @@ +def main(): + pass + +if __name__ == "__main__": + main() From e66e1b697b72ac809e5bb39661ac469f1bc985fd Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio Date: Mon, 24 Nov 2025 18:02:06 +0100 Subject: [PATCH 5/7] programmatic customization of pydabs --- .../README.md | 2 +- .../databricks.yml | 14 +++- .../mutators.py | 8 +-- .../resources/__init__.py | 47 ++++++++---- .../src/notebook_extract.py | 16 ----- .../src/notebook_process_item.py | 16 ----- .../src/tpcds_query1.sql | 42 +++++++++++ .../src/tpcds_query2.sql | 72 +++++++++++++++++++ 8 files changed, 165 insertions(+), 52 deletions(-) delete mode 100644 knowledge_base/pydabs_job_programmatic_generation/src/notebook_extract.py delete mode 100644 knowledge_base/pydabs_job_programmatic_generation/src/notebook_process_item.py create mode 100644 knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query1.sql create mode 100644 knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query2.sql diff --git a/knowledge_base/pydabs_job_programmatic_generation/README.md b/knowledge_base/pydabs_job_programmatic_generation/README.md index 7e45b14..e124406 100644 --- a/knowledge_base/pydabs_job_programmatic_generation/README.md +++ b/knowledge_base/pydabs_job_programmatic_generation/README.md @@ -1,4 +1,4 @@ -# pydabs_job_with_for_each +# pydabs_job_programmatic_generation This example demonstrates a simple Databricks job with programmatic generation and customization. diff --git a/knowledge_base/pydabs_job_programmatic_generation/databricks.yml b/knowledge_base/pydabs_job_programmatic_generation/databricks.yml index a4f07de..263c202 100644 --- a/knowledge_base/pydabs_job_programmatic_generation/databricks.yml +++ b/knowledge_base/pydabs_job_programmatic_generation/databricks.yml @@ -16,7 +16,19 @@ include: - resources/*.yml - resources/*/*.yml +resources: + sql_warehouses: + twoxs_warehouse: + name: Serverless Starter Warehouse + cluster_size: 2X-Small + auto_stop_mins: 10 + targets: dev: mode: development - default: true \ No newline at end of file + default: true + + prod: + mode: production + workspace: + root_path: /Workspace/Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} diff --git a/knowledge_base/pydabs_job_programmatic_generation/mutators.py b/knowledge_base/pydabs_job_programmatic_generation/mutators.py index 201b6f4..2e73300 100644 --- a/knowledge_base/pydabs_job_programmatic_generation/mutators.py +++ b/knowledge_base/pydabs_job_programmatic_generation/mutators.py @@ -1,21 +1,17 @@ from dataclasses import replace - -from databricks.bundles.core import Bundle, job_mutator, m +from databricks.bundles.core import Bundle, job_mutator, mu from databricks.bundles.jobs import Job, JobEmailNotifications - @job_mutator def add_email_notifications(bundle: Bundle, job: Job) -> Job: - if job.email_notifications: + if bundle.target == 'dev': return job - email_notifications = JobEmailNotifications.from_dict( { "on_failure": ["${workspace.current_user.userName}"], } ) - return replace(job, email_notifications=email_notifications) diff --git a/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py b/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py index d72e7d2..e903c00 100644 --- a/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py +++ b/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py @@ -4,8 +4,8 @@ from databricks.bundles.core import ( Bundle, Resources, - load_resources_from_current_package_module, ) +from databricks.bundles.schemas import Schema def load_resources(bundle: Bundle) -> Resources: @@ -13,26 +13,49 @@ def load_resources(bundle: Bundle) -> Resources: 'load_resources' function is referenced in databricks.yml and is responsible for loading bundle resources defined in Python code. This function is called by Databricks CLI during bundle deployment. After deployment, this function is not used. - """ - # the default implementation loads all Python files in 'resources' directory - # return load_resources_from_current_package_module() + the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() - """ - load_resources() is called during bundle deployment - Here a job is created for every notebook in the src folder + Here a job is created for every notebook in the src folder + Plus a schema for the dev environment, to have one schema per user deploying the Job """ resources = Resources() - for file in glob.glob("src/notebook*.py", recursive=True): + + target_schema_name = "target_prod_schema" # this is the schema name for prod - should be deployed with Terraform + + if bundle.target == "dev": + # create 1 schema per user in other environments + # note databricks.yml: the target dev is mode "development" + schema = Schema( + catalog_name="main", + name="prog_gen_target", + comment="Schema for output data" + ) + resources.add_schema( + resource_name="project_schema", + schema=schema + ) + target_schema_name = "${resources.schemas.project_schema.name}" + + for file in glob.glob("src/*.sql", recursive=True): resources.add_job( - resource_name=os.path.basename(file).removesuffix(".py"), + resource_name=os.path.basename(file).removesuffix(".sql"), job={ "name": file, "tasks": [ { - "task_key": "notebook_task", - "notebook_task": {"notebook_path": file}, - }, + "task_key": "create_table", + "sql_task": { + "parameters": { + "target_schema": target_schema_name + }, + "file": { + "path": file, + }, + "warehouse_id": "${resources.sql_warehouses.twoxs_warehouse.id}" + }, + } ], }, ) diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/notebook_extract.py b/knowledge_base/pydabs_job_programmatic_generation/src/notebook_extract.py deleted file mode 100644 index 16c8b1e..0000000 --- a/knowledge_base/pydabs_job_programmatic_generation/src/notebook_extract.py +++ /dev/null @@ -1,16 +0,0 @@ -# Databricks notebook source -lookup_file_name = dbutils.widgets.get('lookup_file_name') - -# COMMAND ---------- - -import json -from datetime import datetime, timedelta - -indexes = range(0,10) -start_date = datetime.today() -data = [{"date": (start_date + timedelta(days=index)).strftime("%Y-%m-%d")} for index in indexes] -dbutils.fs.put(lookup_file_name, json.dumps(data), overwrite=True) -dbutils.jobs.taskValues.set("indexes", list(indexes)) - -# COMMAND ---------- - diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/notebook_process_item.py b/knowledge_base/pydabs_job_programmatic_generation/src/notebook_process_item.py deleted file mode 100644 index a878dcc..0000000 --- a/knowledge_base/pydabs_job_programmatic_generation/src/notebook_process_item.py +++ /dev/null @@ -1,16 +0,0 @@ -# Databricks notebook source -lookup_file_name = dbutils.widgets.get('lookup_file_name') -index = int(dbutils.widgets.get('index')) - -# COMMAND ---------- - -import json - -with open(lookup_file_name, "r") as f: - data = json.load(f) -date = data[index].get("date") - -print(date) - -# COMMAND ---------- - diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query1.sql b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query1.sql new file mode 100644 index 0000000..7dfcf36 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query1.sql @@ -0,0 +1,42 @@ +USE CATALOG samples; +USE SCHEMA tpcds_sf1; + +CREATE OR REPLACE TABLE IDENTIFIER( + 'main.' || :target_schema || '.tpcds_query1' +) AS + WITH customer_total_return AS ( + SELECT + sr_customer_sk AS ctr_customer_sk, + sr_store_sk AS ctr_store_sk, + SUM(sr_return_amt) AS ctr_total_return + FROM + store_returns, + date_dim + WHERE + sr_returned_date_sk = d_date_sk + AND d_year = 2001 + GROUP BY + sr_customer_sk, + sr_store_sk + ) + SELECT + c_customer_id + FROM + customer_total_return ctr1, + store, + customer + WHERE + ctr1.ctr_total_return > ( + SELECT + AVG(ctr_total_return) * 1.2 + FROM + customer_total_return ctr2 + WHERE + ctr1.ctr_store_sk = ctr2.ctr_store_sk + ) + AND s_store_sk = ctr1.ctr_store_sk + AND s_state = 'TN' + AND ctr1.ctr_customer_sk = c_customer_sk + ORDER BY + c_customer_id + LIMIT 100; \ No newline at end of file diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query2.sql b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query2.sql new file mode 100644 index 0000000..b7b10ac --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query2.sql @@ -0,0 +1,72 @@ +USE CATALOG samples; +USE SCHEMA tpcds_sf1; + +CREATE OR REPLACE TABLE IDENTIFIER( + 'main.' || :target_schema || '.tpcds_query2' +) +TBLPROPERTIES ( + 'delta.columnMapping.mode' = 'name' +) AS +WITH wscs AS ( + SELECT sold_date_sk, sales_price + FROM ( + SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price + FROM web_sales + UNION ALL + SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price + FROM catalog_sales + ) +), +wswscs AS ( + SELECT + d_week_seq, + SUM(CASE WHEN d_day_name = 'Sunday' THEN sales_price ELSE NULL END) AS sun_sales, + SUM(CASE WHEN d_day_name = 'Monday' THEN sales_price ELSE NULL END) AS mon_sales, + SUM(CASE WHEN d_day_name = 'Tuesday' THEN sales_price ELSE NULL END) AS tue_sales, + SUM(CASE WHEN d_day_name = 'Wednesday' THEN sales_price ELSE NULL END) AS wed_sales, + SUM(CASE WHEN d_day_name = 'Thursday' THEN sales_price ELSE NULL END) AS thu_sales, + SUM(CASE WHEN d_day_name = 'Friday' THEN sales_price ELSE NULL END) AS fri_sales, + SUM(CASE WHEN d_day_name = 'Saturday' THEN sales_price ELSE NULL END) AS sat_sales + FROM wscs + JOIN date_dim ON d_date_sk = sold_date_sk + GROUP BY d_week_seq +) +SELECT + d_week_seq1, + ROUND(sun_sales1 / sun_sales2, 2), + ROUND(mon_sales1 / mon_sales2, 2), + ROUND(tue_sales1 / tue_sales2, 2), + ROUND(wed_sales1 / wed_sales2, 2), + ROUND(thu_sales1 / thu_sales2, 2), + ROUND(fri_sales1 / fri_sales2, 2), + ROUND(sat_sales1 / sat_sales2, 2) +FROM ( + SELECT + wswscs.d_week_seq AS d_week_seq1, + sun_sales AS sun_sales1, + mon_sales AS mon_sales1, + tue_sales AS tue_sales1, + wed_sales AS wed_sales1, + thu_sales AS thu_sales1, + fri_sales AS fri_sales1, + sat_sales AS sat_sales1 + FROM wswscs + JOIN date_dim ON date_dim.d_week_seq = wswscs.d_week_seq + WHERE d_year = 1998 +) y +JOIN ( + SELECT + wswscs.d_week_seq AS d_week_seq2, + sun_sales AS sun_sales2, + mon_sales AS mon_sales2, + tue_sales AS tue_sales2, + wed_sales AS wed_sales2, + thu_sales AS thu_sales2, + fri_sales AS fri_sales2, + sat_sales AS sat_sales2 + FROM wswscs + JOIN date_dim ON date_dim.d_week_seq = wswscs.d_week_seq + WHERE d_year = 1999 +) z +ON d_week_seq1 = d_week_seq2 - 53 +ORDER BY d_week_seq1; \ No newline at end of file From d9b3b3b7ecc86e164ec00670ef12802bf45a9d4d Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio Date: Mon, 24 Nov 2025 19:09:34 +0100 Subject: [PATCH 6/7] typo --- knowledge_base/pydabs_job_programmatic_generation/mutators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/knowledge_base/pydabs_job_programmatic_generation/mutators.py b/knowledge_base/pydabs_job_programmatic_generation/mutators.py index 2e73300..b928905 100644 --- a/knowledge_base/pydabs_job_programmatic_generation/mutators.py +++ b/knowledge_base/pydabs_job_programmatic_generation/mutators.py @@ -1,6 +1,6 @@ from dataclasses import replace -from databricks.bundles.core import Bundle, job_mutator, mu +from databricks.bundles.core import Bundle, job_mutator from databricks.bundles.jobs import Job, JobEmailNotifications @job_mutator From 01e32caceb54f992b8e54ad4968e3a152a3c8969 Mon Sep 17 00:00:00 2001 From: Lorenzo Rubio Date: Thu, 11 Dec 2025 15:44:32 +0100 Subject: [PATCH 7/7] ruff reformatting --- .../resources/conditional_execution.py | 36 +++++-------------- .../resources/file_arrival.py | 6 ++-- .../mutators.py | 3 +- .../resources/__init__.py | 19 ++++------ .../main.py | 1 + .../resources/for_each_simple.py | 18 +++++----- .../src/notebook_extract.py | 10 +++--- .../src/notebook_process_item.py | 5 ++- .../src/pydabs_job_with_for_each/main.py | 1 + .../resources/task_values_simple.py | 13 ++++++- .../src/notebook_task_a.py | 2 +- .../src/notebook_task_b.py | 2 +- .../src/pydabs_job_with_task_values/main.py | 1 + 13 files changed, 56 insertions(+), 61 deletions(-) diff --git a/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py b/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py index f4242ec..7034b77 100644 --- a/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py +++ b/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py @@ -11,44 +11,26 @@ tasks=[ Task( task_key="check_data_quality", - notebook_task=NotebookTask( - notebook_path="src/check_quality.ipynb" - ), + notebook_task=NotebookTask(notebook_path="src/check_quality.ipynb"), ), Task( task_key="evaluate_quality", condition_task=ConditionTask( left="{{tasks.check_data_quality.values.bad_records}}", op="GREATER_THAN", - right="100" + right="100", ), - depends_on=[ - TaskDependency(task_key="check_data_quality") - ], + depends_on=[TaskDependency(task_key="check_data_quality")], ), Task( task_key="handle_bad_data", - notebook_task=NotebookTask( - notebook_path="src/process_bad_data.ipynb" - ), - depends_on=[ - TaskDependency( - task_key="evaluate_quality", - outcome="true" - ) - ], + notebook_task=NotebookTask(notebook_path="src/process_bad_data.ipynb"), + depends_on=[TaskDependency(task_key="evaluate_quality", outcome="true")], ), Task( task_key="continue_pipeline", - notebook_task=NotebookTask( - notebook_path="src/process_good_data.ipynb" - ), - depends_on=[ - TaskDependency( - task_key="evaluate_quality", - outcome="false" - ) - ], - ) - ] + notebook_task=NotebookTask(notebook_path="src/process_good_data.ipynb"), + depends_on=[TaskDependency(task_key="evaluate_quality", outcome="false")], + ), + ], ) diff --git a/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py b/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py index 58ddb45..022b5ce 100644 --- a/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py +++ b/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py @@ -13,7 +13,9 @@ task_key="process_new_files", notebook_task=NotebookTask( notebook_path="src/process_files.ipynb", - base_parameters={"file_arrival_location": "{{job.trigger.file_arrival.location}}"}, + base_parameters={ + "file_arrival_location": "{{job.trigger.file_arrival.location}}" + }, ), ) ], @@ -24,4 +26,4 @@ wait_after_last_change_seconds=90, ), ), -) \ No newline at end of file +) diff --git a/knowledge_base/pydabs_job_programmatic_generation/mutators.py b/knowledge_base/pydabs_job_programmatic_generation/mutators.py index b928905..0ef6172 100644 --- a/knowledge_base/pydabs_job_programmatic_generation/mutators.py +++ b/knowledge_base/pydabs_job_programmatic_generation/mutators.py @@ -3,9 +3,10 @@ from databricks.bundles.core import Bundle, job_mutator from databricks.bundles.jobs import Job, JobEmailNotifications + @job_mutator def add_email_notifications(bundle: Bundle, job: Job) -> Job: - if bundle.target == 'dev': + if bundle.target == "dev": return job email_notifications = JobEmailNotifications.from_dict( diff --git a/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py b/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py index e903c00..c6b6516 100644 --- a/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py +++ b/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py @@ -22,20 +22,17 @@ def load_resources(bundle: Bundle) -> Resources: """ resources = Resources() - target_schema_name = "target_prod_schema" # this is the schema name for prod - should be deployed with Terraform + target_schema_name = "target_prod_schema" # this is the schema name for prod - should be deployed with Terraform if bundle.target == "dev": # create 1 schema per user in other environments # note databricks.yml: the target dev is mode "development" schema = Schema( - catalog_name="main", - name="prog_gen_target", - comment="Schema for output data" - ) - resources.add_schema( - resource_name="project_schema", - schema=schema + catalog_name="main", + name="prog_gen_target", + comment="Schema for output data", ) + resources.add_schema(resource_name="project_schema", schema=schema) target_schema_name = "${resources.schemas.project_schema.name}" for file in glob.glob("src/*.sql", recursive=True): @@ -47,13 +44,11 @@ def load_resources(bundle: Bundle) -> Resources: { "task_key": "create_table", "sql_task": { - "parameters": { - "target_schema": target_schema_name - }, + "parameters": {"target_schema": target_schema_name}, "file": { "path": file, }, - "warehouse_id": "${resources.sql_warehouses.twoxs_warehouse.id}" + "warehouse_id": "${resources.sql_warehouses.twoxs_warehouse.id}", }, } ], diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py b/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py index 3339ef2..cd9ac48 100644 --- a/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py +++ b/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py @@ -1,5 +1,6 @@ def main(): pass + if __name__ == "__main__": main() diff --git a/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py b/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py index fad4215..ea3b7b9 100644 --- a/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py +++ b/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py @@ -10,17 +10,17 @@ notebook_path="src/notebook_process_item.py", base_parameters={ "index": "{{input}}", - }, + }, ), ) process_item = Task( - task_key='process_item', + task_key="process_item", depends_on=[TaskDependency(task_key="extract")], for_each_task=ForEachTask( - inputs='{{tasks.extract.values.indexes}}', + inputs="{{tasks.extract.values.indexes}}", task=process_item_iteration, - concurrency=10 - ) + concurrency=10, + ), ) for_each_example = Job( @@ -30,9 +30,9 @@ process_item, ], parameters=[ - { - "name": "lookup_file_name", - "default": "/Volumes/main/for_each_example/hotchpotch/my_file.json", - }, + { + "name": "lookup_file_name", + "default": "/Volumes/main/for_each_example/hotchpotch/my_file.json", + }, ], ) diff --git a/knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py b/knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py index 16c8b1e..ebda713 100644 --- a/knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py +++ b/knowledge_base/pydabs_job_with_for_each/src/notebook_extract.py @@ -1,16 +1,18 @@ # Databricks notebook source -lookup_file_name = dbutils.widgets.get('lookup_file_name') +lookup_file_name = dbutils.widgets.get("lookup_file_name") # COMMAND ---------- import json from datetime import datetime, timedelta -indexes = range(0,10) +indexes = range(0, 10) start_date = datetime.today() -data = [{"date": (start_date + timedelta(days=index)).strftime("%Y-%m-%d")} for index in indexes] +data = [ + {"date": (start_date + timedelta(days=index)).strftime("%Y-%m-%d")} + for index in indexes +] dbutils.fs.put(lookup_file_name, json.dumps(data), overwrite=True) dbutils.jobs.taskValues.set("indexes", list(indexes)) # COMMAND ---------- - diff --git a/knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py b/knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py index a878dcc..ba439a8 100644 --- a/knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py +++ b/knowledge_base/pydabs_job_with_for_each/src/notebook_process_item.py @@ -1,6 +1,6 @@ # Databricks notebook source -lookup_file_name = dbutils.widgets.get('lookup_file_name') -index = int(dbutils.widgets.get('index')) +lookup_file_name = dbutils.widgets.get("lookup_file_name") +index = int(dbutils.widgets.get("index")) # COMMAND ---------- @@ -13,4 +13,3 @@ print(date) # COMMAND ---------- - diff --git a/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py b/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py index 3339ef2..cd9ac48 100644 --- a/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py +++ b/knowledge_base/pydabs_job_with_for_each/src/pydabs_job_with_for_each/main.py @@ -1,5 +1,6 @@ def main(): pass + if __name__ == "__main__": main() diff --git a/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py b/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py index 45e91f4..e47787d 100644 --- a/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py +++ b/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py @@ -9,11 +9,22 @@ depends_on=[TaskDependency(task_key="task_a")], notebook_task=NotebookTask(notebook_path="src/notebook_task_b.py"), ) +task_c = Task( + task_key="task_c", + notebook_task=NotebookTask(notebook_path="src/notebook_task_c.py"), +) +task_d = Task( + task_key="task_d", + depends_on=[TaskDependency(task_key="task_c")], + notebook_task=NotebookTask(notebook_path="src/notebook_task_d.py"), +) -task_values_simple=Job( +task_values_simple = Job( name="task_values_simple", tasks=[ task_a, task_b, + task_c, + task_d, ], ) diff --git a/knowledge_base/pydabs_job_with_task_values/src/notebook_task_a.py b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_a.py index c3715d0..63e4e2e 100644 --- a/knowledge_base/pydabs_job_with_task_values/src/notebook_task_a.py +++ b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_a.py @@ -1,3 +1,3 @@ # Databricks notebook source val = [42, 12, 1812] -dbutils.jobs.taskValues.set(key="my_key", value=val) \ No newline at end of file +dbutils.jobs.taskValues.set(key="my_key", value=val) diff --git a/knowledge_base/pydabs_job_with_task_values/src/notebook_task_b.py b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_b.py index 578de83..e49eeb3 100644 --- a/knowledge_base/pydabs_job_with_task_values/src/notebook_task_b.py +++ b/knowledge_base/pydabs_job_with_task_values/src/notebook_task_b.py @@ -1,3 +1,3 @@ # Databricks notebook source val = dbutils.jobs.taskValues.get(taskKey="task_a", key="my_key") -print(val) \ No newline at end of file +print(val) diff --git a/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/main.py b/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/main.py index 3339ef2..cd9ac48 100644 --- a/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/main.py +++ b/knowledge_base/pydabs_job_with_task_values/src/pydabs_job_with_task_values/main.py @@ -1,5 +1,6 @@ def main(): pass + if __name__ == "__main__": main()