Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions packages/pynumaflow/examples/accumulator/blackhole/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
####################################################################################################
# Stage 1: Builder - installs all dependencies using uv
####################################################################################################
FROM ghcr.io/astral-sh/uv:python3.13-trixie AS builder

ENV PYSETUP_PATH="/opt/pysetup"
WORKDIR $PYSETUP_PATH

COPY pyproject.toml uv.lock README.md ./
COPY pynumaflow/ ./pynumaflow/

ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/accumulator/blackhole"
COPY examples/accumulator/blackhole/ $EXAMPLE_PATH/

WORKDIR $EXAMPLE_PATH
RUN uv sync --no-dev --no-install-project --frozen

####################################################################################################
# Stage 2: Runtime - clean image with only installed packages
####################################################################################################
FROM ghcr.io/astral-sh/uv:python3.13-trixie AS udf

ENV PYSETUP_PATH="/opt/pysetup"
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/accumulator/blackhole"

WORKDIR $EXAMPLE_PATH
COPY --from=builder $EXAMPLE_PATH/.venv $EXAMPLE_PATH/.venv
COPY --from=builder $EXAMPLE_PATH/ $EXAMPLE_PATH/

# NOTE: We cannot use "uv run python example.py" here because uv run reads the
# example's pyproject.toml, finds the pynumaflow path source (path = "../../../"),
# and tries to resolve it. In the runtime stage, the parent pynumaflow source tree
# is not present (by design, to keep the image small), so uv run fails.
# Instead, we activate the pre-built .venv via PATH and run python directly.
ENV PATH="$EXAMPLE_PATH/.venv/bin:$PATH"
CMD ["python", "example.py"]

EXPOSE 5000
22 changes: 22 additions & 0 deletions packages/pynumaflow/examples/accumulator/blackhole/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/accumulator-blackhole:${TAG}
DOCKER_FILE_PATH = examples/accumulator/blackhole/Dockerfile

.PHONY: update
update:
uv lock --check || uv lock

.PHONY: image-push
image-push: update
cd ../../../ && docker buildx build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} \
--platform linux/amd64,linux/arm64 . --push

.PHONY: image
image: update
cd ../../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
46 changes: 46 additions & 0 deletions packages/pynumaflow/examples/accumulator/blackhole/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Blackhole Accumulator

An example User Defined Function that demonstrates a "blackhole" accumulator: it intentionally
discards every datum it receives without forwarding any data to the next vertex.

### Why emit drop messages instead of nothing?

An accumulator that simply reads its input and emits nothing leaves the framework unable to release
the per-datum tracked state (WAL), which leads to unbounded memory growth (see
[numaflow-python#356](https://github.com/numaproj/numaflow-python/issues/356)).

To get "blackhole" semantics without leaking memory, this example emits a *drop* message for every
datum using `Message.to_drop(datum)`. A drop message is not forwarded downstream, but it still lets
the framework advance the watermark and release the tracked state for that datum.

This pattern is useful for multiplexer-, cross-join-, or filter-style accumulators that legitimately
need to omit some (or all) of their inputs.

### Applying the Pipeline

To apply the pipeline, use the following command:

```shell
kubectl apply -f pipeline.yaml
```

### Publish messages

Port-forward the HTTP endpoint, and make POST requests using curl. Remember to replace xxxx with the appropriate pod names.

```shell
kubectl port-forward blackhole-http-one-0-xxxx 8444:8443

# Post data to the HTTP endpoint
curl -kq -X POST -d "101" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 60000"
curl -kq -X POST -d "102" https://localhost:8444/vertices/http-one -H "X-Numaflow-Event-Time: 61000"
```

### Verify the output

```shell
kubectl logs -f blackhole-py-sink-0-xxxx
```

The sink receives nothing - every datum is dropped by the accumulator - while the accumulator logs
each datum it drops.
56 changes: 56 additions & 0 deletions packages/pynumaflow/examples/accumulator/blackhole/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
import os
from collections.abc import AsyncIterable

from pynumaflow import setup_logging
from pynumaflow.accumulator import Accumulator, AccumulatorAsyncServer
from pynumaflow.accumulator import (
Message,
Datum,
)
from pynumaflow.shared.asynciter import NonBlockingIterator

_LOGGER = setup_logging(__name__)
if os.getenv("PYTHONDEBUG"):
_LOGGER.setLevel(logging.DEBUG)


class Blackhole(Accumulator):
"""Blackhole is an accumulator that intentionally discards every datum it receives
without forwarding any data downstream.

A naive implementation would simply read the input stream and emit nothing. However, an
accumulator that never emits anything for the datums it consumes leaves the framework unable
to release the per-datum tracked state, leading to unbounded memory growth.

Instead, this example emits a drop message for every datum using ``Message.to_drop(datum)``.
A drop message is not forwarded to the next vertex, but it still allows the framework to
advance the watermark and release the tracked state (WAL) for that datum - giving us
"blackhole" semantics without leaking memory. This pattern is useful for multiplexer-,
cross-join-, or filter-style accumulators that legitimately need to omit some (or all) of
their inputs.
"""

def __init__(self):
_LOGGER.info("Blackhole initialized")

async def handler(
self,
datums: AsyncIterable[Datum],
output: NonBlockingIterator,
):
_LOGGER.info("Blackhole handler started")
async for datum in datums:
_LOGGER.info(
f"Dropping datum with event time: {datum.event_time}, "
f"watermark: {datum.watermark}"
)
# Emit a drop message: nothing is forwarded downstream, but the framework still
# advances the watermark and releases the tracked state for this datum.
await output.put(Message.to_drop(datum))
_LOGGER.info("Timeout reached")


if __name__ == "__main__":
grpc_server = AccumulatorAsyncServer(Blackhole)
grpc_server.start()
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: blackhole
spec:
limits:
readBatchSize: 1
watermark:
maxDelay: 5s
idleSource:
# Duration to be added to the current watermark to progress the watermark when source is idling
incrementBy: 1s
# Duration between the subsequent increment of the watermark as long the source remains idle.
stepInterval: 1s
# Duration after which a source is marked as idle due to lack of data. If determined idle then the watermark is progressed by `incrementBy`
threshold: 5s
# Duration after which, if source doesn't produce any data (from the pipeline's inception), the watermark is initialized with the current wall clock time.
initSourceDelay: 30s
vertices:
- name: http-one
scale:
min: 1
max: 1
source:
http: {}
- name: http-two
scale:
min: 1
max: 1
source:
http: {}
- name: py-accum
udf:
container:
image: quay.io/numaio/numaflow-python/accumulator-blackhole:stable
imagePullPolicy: Always
env:
- name: PYTHONDEBUG
value: "true"
groupBy:
window:
accumulator:
timeout: 10s
keyed: true
storage:
persistentVolumeClaim:
volumeSize: 1Gi
- name: py-sink
scale:
min: 1
max: 1
sink:
log: {}
edges:
- from: http-one
to: py-accum
- from: http-two
to: py-accum
- from: py-accum
to: py-sink
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[project]
name = "accumulator-blackhole"
version = "0.2.4"
description = ""
requires-python = ">=3.13"
dependencies = [
"pynumaflow",
]

[tool.uv.sources]
pynumaflow = { path = "../../../" }

[tool.hatch.build.targets.wheel]
packages = []

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Loading
Loading