Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions knowledge_base/udtf-operator/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
.databricks/
build/
dist/
__pycache__/
*.egg-info
.venv/
scratch/**
!scratch/README.md
**/explorations/**
**/!explorations/README.md
103 changes: 103 additions & 0 deletions knowledge_base/udtf-operator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# User-Defined Table Functions in Unity Catalog

This project demonstrates how to create and register a Python User-Defined Table Function (UDTF) in Unity Catalog using Databricks Asset Bundles. Once registered, the UDTF becomes available to analysts and other users across your Databricks workspace, callable directly from SQL queries.

**Learn more:** [Introducing Python UDTFs in Unity Catalog](https://www.databricks.com/blog/introducing-python-user-defined-table-functions-udtfs-unity-catalog)

## Concrete example: Definition and Usage

This project includes a k-means clustering algorithm as a UDTF.

### Python Implementation

The UDTF is defined in [`src/kmeans_udtf.py`](src/kmeans_udtf.py) as follows:

```python
class SklearnKMeans:
def __init__(self, id_column: str, columns: list, k: int):
self.id_column = id_column
self.columns = columns
self.k = k
self.data = []

def eval(self, row: Row):
# Process each input row
self.data.append(row)

def terminate(self):
# Perform computation and yield results
# ... clustering logic ...
for record in results:
yield (record.id, record.cluster)
```

### SQL Usage

Once registered, any analyst or SQL user in your workspace can call the UDTF from SQL queries:

```sql
SELECT * FROM main.your_schema.k_means(
input_data => TABLE(SELECT * FROM my_data),
id_column => 'id',
columns => array('feature1', 'feature2', 'feature3'),
k => 3
)
```

The UDTF integrates seamlessly with:
- SQL queries in notebooks
- Databricks SQL dashboards
- Any tool that connects to your Databricks workspace via SQL

See [`explorations/sample_notebook.ipynb`](explorations/sample_notebook.ipynb) for complete examples.

## Getting Started With This Project

### Prerequisites

* Databricks workspace with Unity Catalog enabled
* Databricks CLI installed and configured
* Python with `uv` package manager

### Setup and Testing

1. Install dependencies:
```bash
uv sync --dev
```

2. Run tests (registers and executes the UDTF):
```bash
uv run pytest
```

### Deployment

Deploy to dev:
```bash
databricks bundle deploy --target dev
databricks bundle run register_udtf_job --target dev
```

Deploy to production:
```bash
databricks bundle deploy --target prod
databricks bundle run register_udtf_job --target prod
```

The UDTF will be registered at `main.your_username.k_means` (dev) or `main.prod.k_means` (prod).

## Advanced Topics

**CI/CD Integration:**
- Set up CI/CD for Databricks Asset Bundles following the [CI/CD documentation](https://docs.databricks.com/dev-tools/bundles/ci-cd.html)
- To automatically register the UDTF on deployment, add `databricks bundle run -t prod register_udtf_job` to your deployment script after `databricks bundle deploy -t prod` (alternatively, the job in `resources/register_udtf_job.yml` can use a schedule for registration)

**Serverless compute vs. clusters:** The job uses serverless compute by default. Customize catalog/schema in `databricks.yml` or via job parameters.

## Learn More

- [Introducing Python UDTFs in Unity Catalog](https://www.databricks.com/blog/introducing-python-user-defined-table-functions-udtfs-unity-catalog) - Blog post covering UDTF concepts and use cases
- [Python UDTFs Documentation](https://docs.databricks.com/udf/udtf-unity-catalog.html) - Official documentation
- [Databricks Asset Bundles](https://docs.databricks.com/dev-tools/bundles/index.html) - CI/CD and deployment framework
- [Unity Catalog Functions](https://docs.databricks.com/udf/unity-catalog.html) - Governance and sharing
42 changes: 42 additions & 0 deletions knowledge_base/udtf-operator/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# This is a Databricks asset bundle definition for udtf_operator.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: udtf_operator
uuid: 67b02248-6ca2-4230-8ce5-fc7f5e37c7a5

include:
- resources/*.yml
- resources/*/*.yml

# Variable declarations. These variables are assigned in the dev/prod targets below.
variables:
catalog:
description: The catalog to use
schema:
description: The schema to use

targets:
dev:
# The default target uses 'mode: development' to create a development copy.
# - Deployed resources get prefixed with '[dev my_user_name]'
# - Any job schedules and triggers are paused by default.
# See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html.
mode: development
default: true
workspace:
host: https://company.databricks.com
variables:
catalog: main
schema: ${workspace.current_user.short_name}
prod:
mode: production
workspace:
host: https://company.databricks.com
# We explicitly deploy to /Workspace/Users/user@company.com to make sure we only have a single copy.
root_path: /Workspace/Users/user@company.com/.bundle/${bundle.name}/${bundle.target}
variables:
catalog: main
schema: prod
permissions:
- user_name: user@company.com
level: CAN_MANAGE
108 changes: 108 additions & 0 deletions knowledge_base/udtf-operator/explorations/sample_notebook.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# K-Means UDTF Sample Usage\n",
"\n",
"This notebook demonstrates how to use the k-means UDTF with the Titanic dataset."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>num_affected_rows</th>\n",
" <th>num_inserted_rows</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>10</td>\n",
" <td>10</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>"
],
"text/plain": [
"DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Create sample Titanic data\n",
"spark.sql(\"\"\"\n",
" CREATE OR REPLACE TABLE main.test.titanic_sample (\n",
" PassengerId INT,\n",
" Age DOUBLE,\n",
" Pclass INT,\n",
" Survived INT\n",
" )\n",
"\"\"\")\n",
"\n",
"spark.sql(\"\"\"\n",
" INSERT INTO main.test.titanic_sample VALUES\n",
" (1, 22.0, 3, 0),\n",
" (2, 38.0, 1, 1),\n",
" (3, 26.0, 3, 1),\n",
" (4, 35.0, 1, 1),\n",
" (5, 35.0, 3, 0),\n",
" (6, 54.0, 1, 0),\n",
" (7, 2.0, 3, 0),\n",
" (8, 27.0, 1, 0),\n",
" (9, 14.0, 3, 1),\n",
" (10, 4.0, 3, 1)\n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": "# Use k-means UDTF to cluster passengers\n# Assumes the UDTF has been deployed to main.${user}.k_means\n\nspark.sql(\"\"\"\n WITH kmeans AS (\n SELECT *\n FROM main.lennart.k_means(\n input_data => TABLE(SELECT * FROM main.test.titanic_sample),\n id_column => 'PassengerId',\n columns => array('Age', 'Pclass', 'Survived'),\n k => 3\n )\n )\n SELECT \n t.*,\n k.cluster_id\n FROM main.test.titanic_sample t\n JOIN kmeans k ON CAST(t.PassengerId AS STRING) = k.id\n ORDER BY t.PassengerId\n\"\"\").display()"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": "# Analyze cluster characteristics\nspark.sql(\"\"\"\n WITH kmeans AS (\n SELECT *\n FROM main.lennart.k_means(\n input_data => TABLE(SELECT * FROM main.test.titanic_sample),\n id_column => 'PassengerId',\n columns => array('Age', 'Pclass', 'Survived'),\n k => 3\n )\n )\n SELECT \n k.cluster_id,\n COUNT(*) as count,\n AVG(t.Age) as avg_age,\n AVG(t.Pclass) as avg_class,\n AVG(t.Survived) as survival_rate\n FROM main.test.titanic_sample t\n JOIN kmeans k ON CAST(t.PassengerId AS STRING) = k.id\n GROUP BY k.cluster_id\n ORDER BY k.cluster_id\n\"\"\").display()"
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.11"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
9 changes: 9 additions & 0 deletions knowledge_base/udtf-operator/fixtures/titanic_sample.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
PassengerId,Age,Pclass,Survived
1,22.0,3,0
2,38.0,1,1
3,26.0,3,1
4,35.0,1,1
5,35.0,3,0
6,54.0,1,0
7,2.0,3,0
8,27.0,1,0
30 changes: 30 additions & 0 deletions knowledge_base/udtf-operator/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[project]
name = "udtf_operator"
version = "0.0.1"
authors = [{ name = "lennart.kats@databricks.com" }]
requires-python = ">=3.10,<3.13"
dependencies = [
"scikit-learn>=1.3.0",
"numpy>=1.24.0",
]

[dependency-groups]
dev = [
"pytest",
"databricks-dlt",
"databricks-connect>=15.4,<15.5",
"ipykernel",
]

[project.scripts]
main = "udtf_operator.main:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src"]

[tool.black]
line-length = 125
26 changes: 26 additions & 0 deletions knowledge_base/udtf-operator/resources/register_udtf_job.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
resources:
jobs:
register_udtf_job:
name: register_udtf_job
schedule:
quartz_cron_expression: '0 0 8 * * ?' # daily at 8am
timezone_id: "UTC"
parameters:
- name: catalog
default: ${var.catalog}
- name: schema
default: ${var.schema}
tasks:
- task_key: create_udtf
spark_python_task:
python_file: ../src/main.py
parameters:
- "--catalog"
- "{{job.parameters.catalog}}"
- "--schema"
- "{{job.parameters.schema}}"
environment_key: default
environments:
- environment_key: default
spec:
environment_version: "4"
Loading