diff --git a/knowledge_base/scalable_mono_repo_de/.flake8 b/knowledge_base/scalable_mono_repo_de/.flake8 new file mode 100644 index 0000000..35fd8ad --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/.flake8 @@ -0,0 +1,5 @@ +[flake8] +ignore = E501, W503, F403 +max-line-length = 90 +max-complexity = 18 +select = B,C,E,F,W,T4,B9 \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/.github/workflows/staging_workflow.yml b/knowledge_base/scalable_mono_repo_de/.github/workflows/staging_workflow.yml new file mode 100644 index 0000000..adc0434 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/.github/workflows/staging_workflow.yml @@ -0,0 +1,93 @@ +# NOTE - A lot of this can be configured using environments & a re-usable workflow. +name: "Release workflow for staging environment." + +# Ensure that only a single job or workflow using the same concurrency group +# runs at a time. +concurrency: 1 + +# Trigger this workflow whenever a pull request is opened against the repo's +# staging branch +on: + push: + branches: + - staging + +jobs: + test: + name: "Test python packages" + runs-on: ubuntu-latest + + steps: + # Check out this repo + - uses: actions/checkout@v3 + + # Use the specified python version + - uses: actions/setup-python@v5 + with: + python-version: '3.10.14' + + # Download the Databricks CLI for bundle commands + - uses: databricks/setup-cli@main + + # Run tests on remote, under the hood the service principle + # runs the tests defined in this repository on serverless compute + - run: make ci-test + working-directory: . + env: + DATABRICKS_HOST: https://company.databricks.com + DATABRICKS_CLIENT_ID: ${{ secrets.YOUR_SERVICE_PRINCIPLE_CLIENT_ID_STAGING }} + DATABRICKS_CLIENT_SECRET: ${{ secrets.YOUR_SERVICE_PRINCIPLE_CLIENT_SECRET_STAGING }} + DATABRICKS_CLUSTER_ID: ${{ secrets.YOUR_DATABRICKS_CLUSTER_ID_STAGING }} + DATABRICKS_BUNDLE_ENV: staging + + validate: + name: "Validate bundle" + runs-on: ubuntu-latest + + # Only run if tests pass + needs: + - test + + steps: + # Check out this repo + - uses: actions/checkout@v3 + + # Download the Databricks CLI for bundle commands + - uses: databricks/setup-cli@main + + # Validate the bundle configuration before we try to deploy anything + # Ideally here we would also do something like a "dry-run" when + # functionality exists + - run: databricks bundle validate -t staging + working-directory: . + env: + DATABRICKS_HOST: https://company.databricks.com + DATABRICKS_CLIENT_ID: ${{ secrets.YOUR_SERVICE_PRINCIPLE_CLIENT_ID_STAGING }} + DATABRICKS_CLIENT_SECRET: ${{ secrets.YOUR_SERVICE_PRINCIPLE_CLIENT_SECRET_STAGING }} + DATABRICKS_BUNDLE_ENV: staging + + + deploy: + name: "Deploy bundle" + runs-on: ubuntu-latest + + # Only run if validate succeeds + needs: + - validate + + steps: + # Check out this repo + - uses: actions/checkout@v3 + + # Download the Databricks CLI for bundle commands + - uses: databricks/setup-cli@main + + # Deploy the bundle to the "staging" target as defined + # in the bundle's configuration file + - run: databricks bundle deploy -t staging --auto-approve + working-directory: . + env: + DATABRICKS_HOST: https://company.databricks.com + DATABRICKS_CLIENT_ID: ${{ secrets.YOUR_SERVICE_PRINCIPLE_CLIENT_ID_STAGING }} + DATABRICKS_CLIENT_SECRET: ${{ secrets.YOUR_SERVICE_PRINCIPLE_CLIENT_SECRET_STAGING }} + DATABRICKS_BUNDLE_ENV: staging \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/.gitignore b/knowledge_base/scalable_mono_repo_de/.gitignore new file mode 100644 index 0000000..b432c77 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/.gitignore @@ -0,0 +1,10 @@ +.databricks/ +build/ +dist/ +__pycache__/ +*.egg-info +.venv/ +scratch/** +!scratch/README.md +.vscode +.vscode/* \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/.pre-commit-config.yaml b/knowledge_base/scalable_mono_repo_de/.pre-commit-config.yaml new file mode 100644 index 0000000..da70d6e --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/.pre-commit-config.yaml @@ -0,0 +1,15 @@ +repos: + # Linting +- repo: https://github.com/ambv/black + rev: 23.1.0 + hooks: + - id: black + language_version: python3.10 + # flake8 +- repo: https://github.com/PyCQA/flake8.git + rev: 4.0.1 + hooks: + - id: flake8 + additional_dependencies: [ + 'flake8-future-annotations==0.0.4', + ] \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/.python-version b/knowledge_base/scalable_mono_repo_de/.python-version new file mode 100644 index 0000000..9dfc796 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/.python-version @@ -0,0 +1 @@ +3.10.14 \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/Makefile b/knowledge_base/scalable_mono_repo_de/Makefile new file mode 100644 index 0000000..e8307ab --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/Makefile @@ -0,0 +1,64 @@ +# Global vars. +VENV=.venv +PYTHON_VERSION=3.10.14 +PYTHON=${VENV}/bin/python + +# Define standard colours. +GREEN=\033[0;32m +RED=\033[0;31m +BLUE=\033[0;34m + +.PHONY: clean +clean: +### Remove any existing virtual environments & temp files. + @echo "${RED}Removing existing virtual environments." + rm -rf .python-version + rm -rf $(VENV) + + @echo "${GREEN}Removing temp files${NORMAL}" + -rm -rf .cache + -rm -rf .pytest_cache + -rm -rf coverage + -rm -rf .coverage + -rm -rf build + -rm -rf */*/build + -rm -rf dist + -rm -rf */*/dist + -rm -rf *.egg-info + -rm -rf */*/*.egg-info + -rm -rf *.whl + +build-local-virtualenv: +### Install python version locally using pyenv & set it to local version used +### for development. + @echo "${GREEN}Installing default python version using pyenv." + pyenv install -s $(PYTHON_VERSION) + pyenv local $(PYTHON_VERSION) + @echo "${GREEN}Creating virtual environment." + test -d $(VENV) || $(HOME)/.pyenv/versions/$(PYTHON_VERSION)/bin/python -m venv $(VENV) + + @echo "${GREEN}Building root environment for local testing & databricks connect" + . $(VENV)/bin/activate && \ + pip install -r requirements-dev.txt && \ + pre-commit install + +.PHONY: setup +### Setup local virtual environment for testing & development. +setup: clean build-local-virtualenv + +.PHONY: test +### Run tests on remote. +test: + @echo "${GREEN}Running tests" + $(PYTHON) -m pytest -s tests/ -v -p no:warnings + +build-test: setup test + +########################################### +### CI ### +########################################### + +ci-test: +### This should probably be cleaned up & improved. + pip install -r requirements-dev.txt && \ + python3 -m pytest -s tests/ -v \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/README.md b/knowledge_base/scalable_mono_repo_de/README.md new file mode 100644 index 0000000..e20593e --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/README.md @@ -0,0 +1,108 @@ +# Scalable Databricks Asset Bundles (DABs) mono-repo. + +This project aims to give users an idea of how you can structure your DABs git repositories in a scalable & effective manner, as well as some general best practices & CI/CD examples. + +**This repo is only intended to be used for demonstrative purposes. Myself & Databricks are not liable for any short-comings in this project.** + +## Prerequisites + +1\. Python versions & environments are managed via. `pyenv`. You can [install pyenv](https://github.com/pyenv/pyenv?tab=readme-ov-file#installation) using a package manager such as [homebrew](https://docs.brew.sh/): + +``` +brew update +brew install pyenv +``` + +## Getting started + +1\. Install the Databricks CLI from https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +2a. Authenticate to your Sandbox / Development workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2b. Setup your default Databricks profile in `.databrickscfg` so that any validation & deployment requests are made against that workspace: + ``` + host = + serverless_compute_id = auto + token = + ``` + +**Note:** it is advised that you use serverless compute where possible to run your tests, this provides the shortest feedback loop for development. If you want to use an interactive cluster instead, remove the `serverless_compute_id = auto` flag & replace it with the `cluster_id = ` flag. + +3\. Setup your local environment for development purposes by running: + ``` + make setup + ``` +This creates a local python virtual environment & installs all project dependencies, it also installs `pre-commit` hooks, these are entirely optional. + +4\. Verify that your environment is correctly configured by running: + + ``` + make test + ``` + +This will run all package tests defined in `./tests/` remotely in your Databricks workspace on serverless or interactive compute, depending on which you have specified. Alternatively, you _could_ run this locally by containerising spark & integrating it to run your tests. + +5\. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` +(Note that "dev" is the default target, so the `-t` parameter is optional here.) + +This deploys everything that's defined for this project. +For example, the default template would deploy a job called +`[dev yourname] my_workflow_dev` to your workspace. +You can find that job by opening your workpace and clicking on **Workflows**. + +6\. Verify that the job has been deployed by running: + ``` + $ databricks bundle run my_serverless_workflow -t dev + ``` + +You should see something like the following from your IDE: + +``` +Run URL: https://company.databricks.com// + +2024-01-01 00:00:00 "[dev ] my_serverless_workflow_dev" RUNNING +. +. +. +``` + +You can verify that the job is running by visiting the UI. Once the job has started, you should see the cluster logs in your IDE again: + +``` +cowsay is installed & has version: 6.1 +boto3 is installed & has version: 1.0.0 +get_taxis_data is installed & has version: 0.1.0 +utils is installed & has version: 0.1.0 ++--------------------+---------------------+-------------+-----------+----------+-----------+--------------------+ +|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|processing_timestamp| ++--------------------+---------------------+-------------+-----------+----------+-----------+--------------------+ +| 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171|2024-10-25 15:00:...| +| 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110|2024-10-25 15:00:...| +| 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023|2024-10-25 15:00:...| +| 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017|2024-10-25 15:00:...| +| 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282|2024-10-25 15:00:...| ++--------------------+---------------------+-------------+-----------+----------+-----------+--------------------+ +only showing top 5 rows +``` + +## Intended Usage. + +The intended workflow for this project / demo looks something like the following: + +1\. Contributors branch off of the remote staging branch for their new features. + +2\. As contributors make their development changes locally on this new branch, they can run their tests either locally or in their remote sandbox / development Databricks workspace using a compute resource of their choice & on a DBR of their choice. + +3\. Contributors can also deploy their changes to this sandbox / development workspace for integrated testing & to run jobs or workflows if they want to. + +4\. Once the contributor is happy with their changes, they can commit their changes up to the remote feature branch & open a PR. + +5\. Upon merge into the `staging` branch, the github workflow defined at `.github/workflows` will run the same tests, validation & deployment in a controlled environment & using a service principle. This will deploy all changes to the staging workspace. + +6\. Once the deployment has succeeded & further testing in staging has been done, the same process is carried out to deploy into production (this still needs to be done). diff --git a/knowledge_base/scalable_mono_repo_de/databricks.yml b/knowledge_base/scalable_mono_repo_de/databricks.yml new file mode 100644 index 0000000..dd0f6a3 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/databricks.yml @@ -0,0 +1,79 @@ +# This is a Databricks asset bundle definition for my_project. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: dabs_bootstrap + +include: + - resources/jobs/*.yml + - resources/pipelines/*.yml + +# Define re-usable complex variables +variables: + env: + description: Environment value for job name injection. + type: complex + default: dev + default_serverless_env_spec: + description: Default serverless environment configuration (example). + type: complex + default: + client: "1" + dependencies: + - -r "/Workspace${workspace.file_path}/environments/default-requirements.txt" + - ../../src/packages/get_taxis_data/dist/*.whl + - ../../src/packages/utils/dist/*.whl + +# Build artifacts using poetry, in this case we only have two +artifacts: + utils_package: + type: whl + build: poetry build + path: src/packages/utils/ + get_taxis_data_package: + type: whl + build: poetry build + path: src/packages/get_taxis_data/ + +targets: + # The 'dev' target, for development purposes. This target is the default + dev: + # We use 'mode: development' to indicate this is a personal development copy: + # - Deployed resources get prefixed with '[dev my_user_name]' + # - Any job schedules and triggers are paused by default + # - The 'development' mode is used for Delta Live Tables pipelines + mode: development + default: true + workspace: + host: https://company.databricks.com + + # The 'staging' target, used for UAT deployment - we mimic production here + staging: + # We use 'mode: production' to indicate this is a production deployment + # Doing so enables strict verification of the settings below + mode: production + workspace: + host: https://company.databricks.com + # We always use /Shared/.bundle/${bundle.name} for all resources to make sure we only have a single copy + root_path: /Shared/.bundle/${bundle.name} + run_as: + # This runs as your service principle in staging + service_principal_name: {{ your_service_principle_id }} + # We can use a default env variable to dynamically inject "staging" into our resource names + variables: + env: staging + + # The 'prod' target, used for production deployment + prod: + # We use 'mode: production' to indicate this is a production deployment + # Doing so enables strict verification of the settings below + mode: production + workspace: + host: https://company.databricks.com + # We always use /Shared/.bundle/${bundle.name} for all resources to make sure we only have a single copy + root_path: /Shared/.bundle/${bundle.name} + run_as: + # This runs as your service principle in staging + service_principal_name: {{ your_service_principle_id }} + # We can use a default env variable to dynamically inject "prod" into our resource names + variables: + env: prod \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/environments/default-requirements.txt b/knowledge_base/scalable_mono_repo_de/environments/default-requirements.txt new file mode 100644 index 0000000..93195d1 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/environments/default-requirements.txt @@ -0,0 +1,2 @@ +cowsay==6.1 +boto3==1.0 \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/pytest.ini b/knowledge_base/scalable_mono_repo_de/pytest.ini new file mode 100644 index 0000000..de19c9f --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +testpaths = tests \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/requirements-dev.txt b/knowledge_base/scalable_mono_repo_de/requirements-dev.txt new file mode 100644 index 0000000..49f83a3 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/requirements-dev.txt @@ -0,0 +1,34 @@ +## requirements-dev.txt: dependencies for local development. +## +## For defining dependencies used by jobs in Databricks Workflows, see +## https://docs.databricks.com/dev-tools/bundles/library-dependencies.html + +## Pinning these for ease of use & maintainability moving forward. + +## Add code completion support for DLT +databricks-dlt + +## pytest is the default package used for testing +pytest + +## Dependencies for building wheel files +wheel + +## databricks-connect can be used to run parts of this project locally. +## See https://docs.databricks.com/dev-tools/databricks-connect.html. +## +## databricks-connect is automatically installed if you're using Databricks +## extension for Visual Studio Code +## (https://docs.databricks.com/dev-tools/vscode-ext/dev-tasks/databricks-connect.html). +## +## To manually install databricks-connect, either follow the instructions +## at https://docs.databricks.com/dev-tools/databricks-connect.html +## to install the package system-wide. Or uncomment the line below to install a +## version of db-connect that corresponds to the Databricks Runtime version used +## for this project. +## Ideally, for shortest feedback loop, especially in CI/CD pipelines, we should use a serverless resource here. +databricks-connect==15.1 + +## Pre-commit for commit hooks - if you do not want to use or enable pre-commit hooks, comment out this line +## & remove pre-commit install from `setup` in Makefile +pre-commit==3.1 \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/resources/jobs/my_project_job.yml b/knowledge_base/scalable_mono_repo_de/resources/jobs/my_project_job.yml new file mode 100644 index 0000000..8012f8d --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/resources/jobs/my_project_job.yml @@ -0,0 +1,41 @@ +# The main job for my_project. +resources: + jobs: + my_workflow: + name: my_workflow_${var.env} + + schedule: + # Run every day at 8 AM + quartz_cron_expression: '0 0 8 * * ?' + timezone_id: Europe/London + + email_notifications: + on_failure: + - {{ your_email_address }} + + tasks: + - task_key: standard_job_task + job_cluster_key: job_cluster + notebook_task: + notebook_path: ../../src/notebooks/notebook.ipynb + libraries: + # We include both wheel files from our packages. + # See https://docs.databricks.com/dev-tools/bundles/library-dependencies.html + # for more information on how to add other libraries. + - whl: ../../src/packages/get_taxis_data/dist/*.whl + - whl: ../../src/packages/utils/dist/*.whl + + - task_key: dlt_pipeline_task + depends_on: + - task_key: standard_job_task + pipeline_task: + pipeline_id: ${resources.pipelines.my_dlt_pipeline.id} + + job_clusters: + - job_cluster_key: job_cluster + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + autoscale: + min_workers: 1 + max_workers: 4 \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/resources/jobs/my_project_serverless_job.yml b/knowledge_base/scalable_mono_repo_de/resources/jobs/my_project_serverless_job.yml new file mode 100644 index 0000000..7762a10 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/resources/jobs/my_project_serverless_job.yml @@ -0,0 +1,26 @@ +# The main configuration for serverless task +resources: + jobs: + my_serverless_workflow: + name: my_serverless_workflow_${var.env} + + schedule: + # Run every day at 8:00 AM + quartz_cron_expression: '0 0 8 * * ?' + timezone_id: Europe/Amsterdam + + email_notifications: + on_failure: + - {{ your_email_address }} + + tasks: + - task_key: python_job_serverless + spark_python_task: + python_file: ../../src/python/serverless_job.py + + # The key that references an environment spec in a job. + environment_key: default + + environments: + - environment_key: default + spec: ${var.default_serverless_env_spec} \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/resources/pipelines/my_project_pipeline.yml b/knowledge_base/scalable_mono_repo_de/resources/pipelines/my_project_pipeline.yml new file mode 100644 index 0000000..6a09379 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/resources/pipelines/my_project_pipeline.yml @@ -0,0 +1,16 @@ +# The main pipeline for my_project +resources: + pipelines: + my_dlt_pipeline: + name: my_dlt_pipeline_${var.env} + catalog: {{ your_metastore_catalog }} + # In lower environments for local deployments, make this schema value unique for best testing experience (UC isolation) + target: {{ your_schema_target }} + libraries: + - notebook: + path: ../../src/pipelines/dlt_pipeline.ipynb + + configuration: + bundle.sourcePath: /Workspace/${workspace.file_path}/src + dlt.dependencies.get_taxis_data: /Workspace${workspace.file_path}/src/packages/get_taxis_data + dlt.dependencies.utils: /Workspace${workspace.file_path}/src/packages/utils \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/src/notebooks/notebook.ipynb b/knowledge_base/scalable_mono_repo_de/src/notebooks/notebook.ipynb new file mode 100644 index 0000000..e00f5d2 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/src/notebooks/notebook.ipynb @@ -0,0 +1,88 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "ee353e42-ff58-4955-9608-12865bd0950e", + "showTitle": false, + "title": "" + } + }, + "source": [ + "# Default notebook\n", + "\n", + "This default notebook is executed using Databricks Workflows as defined in resources/my_project_job.yml." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "6bca260b-13d1-448f-8082-30b60a85c9ae", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "from get_taxis_data.main import get_taxis_data\n", + "\n", + "df = get_taxis_data(spark)\n", + "df.show(5)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from utils.main import add_processing_timestamp\n", + "\n", + "df = add_processing_timestamp(df=df)\n", + "df.show(5)" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "dashboards": [], + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "notebookName": "notebook", + "widgets": {} + }, + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/README.md b/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/README.md new file mode 100644 index 0000000..a62b396 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/README.md @@ -0,0 +1,3 @@ +# **get_taxis_data** + +This simple python package returns taxi data from the sample data that is pre-loaded in a databricks workspace. \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/get_taxis_data/main.py b/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/get_taxis_data/main.py new file mode 100644 index 0000000..14ec0e3 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/get_taxis_data/main.py @@ -0,0 +1,17 @@ +from pyspark.sql import SparkSession, DataFrame + +# Create a new Databricks Connect session. If this fails, +# check that you have configured Databricks Connect correctly. +# See https://docs.databricks.com/dev-tools/databricks-connect.html. +def get_spark() -> SparkSession: + try: + from databricks.connect import DatabricksSession + return DatabricksSession.builder.serverless(True).getOrCreate() + except ImportError: + return SparkSession.builder.getOrCreate() + +def get_taxis_data(spark: SparkSession) -> DataFrame: + """ + Get pre-loaded taxis data from the Databricks workspace. + """ + return spark.read.table("samples.nyctaxi.trips") \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/poetry.lock b/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/poetry.lock new file mode 100644 index 0000000..23f6c7d --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/poetry.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +package = [] + +[metadata] +lock-version = "2.0" +python-versions = "^3.10" +content-hash = "53f2eabc9c26446fbcc00d348c47878e118afc2054778c3c803a0a8028af27d9" \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/pyproject.toml b/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/pyproject.toml new file mode 100644 index 0000000..7609264 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/src/packages/get_taxis_data/pyproject.toml @@ -0,0 +1,20 @@ +[tool.poetry] +name = "get-taxis-data" +version = "0.1.0" +description = "Python package that gets pre-loaded taxi data from your Databricks workspace." +authors = ["Your Name "] +readme = "README.md" + +packages = [ + { include = "get_taxis_data" } +] + +[tool.poetry.scripts] +main = "get_taxis_data.main:get_taxis_data" + +[tool.poetry.dependencies] +python = "^3.10" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/src/packages/utils/README.md b/knowledge_base/scalable_mono_repo_de/src/packages/utils/README.md new file mode 100644 index 0000000..e727343 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/src/packages/utils/README.md @@ -0,0 +1,5 @@ +# **utils** + +## **Functions.** + +* **add_processing_timestamp** - Adds the current timestamp as a column to a dataframe. \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/src/packages/utils/poetry.lock b/knowledge_base/scalable_mono_repo_de/src/packages/utils/poetry.lock new file mode 100644 index 0000000..23f6c7d --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/src/packages/utils/poetry.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +package = [] + +[metadata] +lock-version = "2.0" +python-versions = "^3.10" +content-hash = "53f2eabc9c26446fbcc00d348c47878e118afc2054778c3c803a0a8028af27d9" \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/src/packages/utils/pyproject.toml b/knowledge_base/scalable_mono_repo_de/src/packages/utils/pyproject.toml new file mode 100644 index 0000000..b87b2b1 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/src/packages/utils/pyproject.toml @@ -0,0 +1,20 @@ +[tool.poetry] +name = "utils" +version = "0.1.0" +description = "Python package that contails useful helper functions for spark streaming jobs." +authors = ["Your Name "] +readme = "README.md" + +packages = [ + { include = "utils" } +] + +[tool.poetry.scripts] +main = "utils.main:add_processing_timestamp" + +[tool.poetry.dependencies] +python = "^3.10" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/src/packages/utils/utils/main.py b/knowledge_base/scalable_mono_repo_de/src/packages/utils/utils/main.py new file mode 100644 index 0000000..54e18a8 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/src/packages/utils/utils/main.py @@ -0,0 +1,8 @@ +from pyspark.sql import DataFrame +from pyspark.sql.functions import current_timestamp + +def add_processing_timestamp(df: DataFrame = None) -> DataFrame: + """ + Function that returns the current datetime. + """ + return df.withColumn('processing_timestamp', current_timestamp()) \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/src/pipelines/dlt_pipeline.ipynb b/knowledge_base/scalable_mono_repo_de/src/pipelines/dlt_pipeline.ipynb new file mode 100644 index 0000000..a12e225 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/src/pipelines/dlt_pipeline.ipynb @@ -0,0 +1,93 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "9a626959-61c8-4bba-84d2-2a4ecab1f7ec", + "showTitle": false, + "title": "" + } + }, + "source": [ + "# DLT pipeline\n", + "\n", + "This Delta Live Tables (DLT) definition is executed using a pipeline defined in resources/my_project_pipeline.yml." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "9198e987-5606-403d-9f6d-8f14e6a4017f", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import sys\n", + "sys.path.append(spark.conf.get(\"dlt.dependencies.get_taxis_data\", \".\"))\n", + "sys.path.append(spark.conf.get(\"dlt.dependencies.utils\", \".\"))\n", + "\n", + "# Import DLT\n", + "import dlt\n", + "from pyspark.sql.functions import expr\n", + "from get_taxis_data.main import get_taxis_data\n", + "from utils.main import add_processing_timestamp" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "3fc19dba-61fd-4a89-8f8c-24fee63bfb14", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "@dlt.view()\n", + "def taxi_raw():\n", + " return add_processing_timestamp(df=get_taxis_data(spark))\n", + "\n", + "@dlt.table(\n", + " name='taxi_filtered'\n", + ")\n", + "def filtered_taxis():\n", + " return dlt.read(\"taxi_raw\").filter(expr(\"fare_amount < 30\"))" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "dashboards": [], + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "notebookName": "dlt_pipeline", + "widgets": {} + }, + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/knowledge_base/scalable_mono_repo_de/src/python/serverless_job.py b/knowledge_base/scalable_mono_repo_de/src/python/serverless_job.py new file mode 100644 index 0000000..9507b64 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/src/python/serverless_job.py @@ -0,0 +1,25 @@ +from importlib.metadata import version +from get_taxis_data.main import get_taxis_data +from utils.main import add_processing_timestamp + + +def get_package_versions(packages: dict[str]): + """ + Verify our package installs + """ + for package in packages: + print(f"{package} is installed & has version: {version(package)}") + + +def main(): + # Verify that we have our packages installed + get_package_versions(["cowsay", "boto3", "get_taxis_data", "utils"]) + + # Use our packages to read in taxi data & create a processing timestamp column + df = get_taxis_data(spark) + df = add_processing_timestamp(df=df) + df.show(5) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/tests/get_taxis_data/test_entrypoint.py b/knowledge_base/scalable_mono_repo_de/tests/get_taxis_data/test_entrypoint.py new file mode 100644 index 0000000..0c76dd9 --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/tests/get_taxis_data/test_entrypoint.py @@ -0,0 +1,11 @@ +import pytest +from pyspark.sql import SparkSession +from src.packages.get_taxis_data.get_taxis_data.main import * + +@pytest.fixture +def init_spark(): + return get_spark() + +def test_entrypoint(init_spark: SparkSession): + taxis = get_taxis_data(init_spark) + assert taxis.count() > 5 \ No newline at end of file diff --git a/knowledge_base/scalable_mono_repo_de/tests/utils/test_utils.py b/knowledge_base/scalable_mono_repo_de/tests/utils/test_utils.py new file mode 100644 index 0000000..f34141e --- /dev/null +++ b/knowledge_base/scalable_mono_repo_de/tests/utils/test_utils.py @@ -0,0 +1,19 @@ +import pytest +from pyspark.sql import SparkSession, DataFrame +from src.packages.get_taxis_data.get_taxis_data.main import get_spark +from src.packages.utils.utils.main import * + + +@pytest.fixture +def init_spark(): + return get_spark() + + +@pytest.fixture +def generate_dataframe(init_spark: SparkSession): + return init_spark.createDataFrame([(1, "foo"), (2, "bar")], ["id", "name"]) + + +def test_processing_timestamp(generate_dataframe: DataFrame): + df = add_processing_timestamp(df=generate_dataframe) + assert "processing_timestamp" in df.columns \ No newline at end of file