Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 268 additions & 11 deletions modules/develop/pages/data-transforms/build.adoc
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
= Develop Data Transforms
:description: Learn how to initialize a data transforms project and write transform functions in your chosen language.
:page-categories: Development, Stream Processing, Data Transforms
:page-topic-type: how-to
:personas: streaming_developer, application_developer
:learning-objective-1: Initialize a data transforms project with the rpk CLI
:learning-objective-2: Build transform functions that process records and write to output topics
:learning-objective-3: Implement multi-topic routing patterns with Schema Registry integration
// tag::single-source[]

{description}

After reading this page, you will be able to:

* [ ] {learning-objective-1}
* [ ] {learning-objective-2}
* [ ] {learning-objective-3}

== Prerequisites

You must have the following development tools installed on your host machine:
Expand Down Expand Up @@ -44,15 +55,15 @@ To initialize a data transforms project, use the following command to set up the
rpk transform init --language=<language> --name=<name>
----

If you do not include the `--language` flag, the command will prompt you for the language. Supported languages include:
If you do not include the `--language` flag, the command prompts you for the language. Supported languages include:

* `tinygo-no-goroutines` (does not include https://golangdocs.com/goroutines-in-golang[Goroutines])
* `tinygo-with-goroutines`
* `rust`
* `javascript`
* `typescript`

For example, if you choose `tinygo-no-goroutines`, the following project files are created:
For example, if you choose `tinygo-no-goroutines`, `rpk` creates the following project files:

[.no-copy]
----
Expand Down Expand Up @@ -165,7 +176,7 @@ await esbuild.build({
process: false, // Don't inject the process global, the Redpanda JavaScript runtime does that.
},
polyfills: {
= crypto: true, // Enable crypto polyfill
crypto: true, // Enable crypto polyfill
// Add other polyfills as needed
},
}),
Expand All @@ -180,7 +191,7 @@ await esbuild.build({

By distinguishing between recoverable and critical errors, you can ensure that your transform functions are both resilient and robust. Handling recoverable errors internally helps maintain continuous operation, while allowing critical errors to escape ensures that the system can address severe issues effectively.

Redpanda tracks the offsets of records that have been processed by transform functions. If an error escapes the Wasm virtual machine (VM), the VM will fail. When the Wasm engine detects this failure and starts a new VM, the transform function will retry processing the input topics from the last processed offset, potentially leading to repeated failures if the underlying issue is not resolved.
Redpanda tracks the offsets of records that transform functions have processed. If an error escapes the Wasm virtual machine (VM), the VM will fail. When the Wasm engine detects this failure and starts a new VM, the transform function retries processing the input topics from the last processed offset, potentially leading to repeated failures if the underlying issue is not resolved.

Handling errors internally by logging them and continuing to process subsequent records can help maintain continuous operation. However, this approach can result in silently discarding problematic records, which may lead to unnoticed data loss if the logs are not monitored closely.

Expand Down Expand Up @@ -401,14 +412,26 @@ onRecordWritten((event, writer) => {

=== Write to specific output topics

You can configure your transform function to write records to specific output topics. This is useful for filtering or routing messages based on certain criteria. The following example shows a filter that outputs only valid JSON from the input topic into the output topic. Invalid JSON is written to a different output topic.
You can configure your transform function to write records to specific output topics based on message content, enabling powerful routing and fan-out patterns. This capability is useful for:

* Filtering messages by criteria and routing to different topics
* Fan-out patterns that distribute data from one input topic to multiple output topics
* Event routing based on message type or schema
* Data distribution for downstream consumers

Wasm transforms provide a simpler alternative to external connectors like Kafka Connect for in-broker data routing, with lower latency and no additional infrastructure to manage.

==== Basic JSON validation example

The following example shows a filter that outputs only valid JSON from the input topic into the output topic. The transform writes invalid JSON to a different output topic.

[tabs]
======
Go::
+
--
```go
[source,go]
----
import (
"encoding/json"
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
Expand All @@ -420,17 +443,18 @@ func main() {

func filterValidJson(event transform.WriteEvent, writer transform.RecordWriter) error {
if json.Valid(event.Record().Value) {
return w.Write(e.Record())
return writer.Write(event.Record())
}
// Send invalid records to separate topic
return writer.Write(e.Record(), transform.ToTopic("invalid-json"))
return writer.Write(event.Record(), transform.ToTopic("invalid-json"))
}
```
----
--
Rust::
+
--
```rust
[source,rust]
----
use anyhow::Result;
use redpanda_transform_sdk::*;

Expand All @@ -448,7 +472,7 @@ fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()>
}
Ok(())
}
```
----
--
JavaScript::
+
Expand All @@ -458,6 +482,239 @@ The JavaScript SDK does not support writing records to a specific output topic.
--
======

[[multi-topic-fanout]]
==== Multi-topic fan-out with Schema Registry

This example routes batched updates from a single input topic to multiple output topics based on a routing field in each message. Messages are encoded with the xref:manage:schema-reg/schema-reg-overview.adoc#wire-format[Schema Registry wire format] for validation against the output topic schema. Consider using this pattern with Iceberg-enabled topics to fan out data directly into lakehouse tables.

.Input message example
[,json]
----
{
"updates": [
{"table": "orders", "data": {"order_id": "123", "amount": 99.99}},
{"table": "inventory", "data": {"product_id": "P456", "quantity": 50}},
{"table": "customers", "data": {"customer_id": "C789", "name": "Jane"}}
]
}
----

xref:develop:data-transforms/configure.adoc[Configure the transform] with multiple output topics:

[,yaml]
----
name: event-router
input_topic: events
output_topics:
- orders
- inventory
- customers
----

The transform extracts each update and routes it to the appropriate topic based on the `table` field. In this example, it is assumed that you have created each output topic and registered the corresponding schemas in Schema Registry.

NOTE: xref:manage:schema-reg/schema-reg-api.adoc[Register schemas in Schema Registry] before deploying the transform. Replace the hardcoded schema IDs (1, 2, 3) in the code examples with the IDs returned by Schema Registry during registration. Use the `{topic-name}-value` naming convention for schema subjects (for example, `orders-value`, `inventory-value`).

[tabs]
======
Go::
+
--
.`go.mod`
[%collapsible]
====
[,go]
----
module fanout-example

go 1.20

require github.com/redpanda-data/redpanda/src/transform-sdk/go/transform v1.1.0 // v1.1.0+ required
----
====

`transform.go`:

[,go]
----
package main

import (
"encoding/binary"
"encoding/json"
"log"
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

// Input message structure with array of updates
type BatchMessage struct {
Updates []TableUpdate `json:"updates"`
}

// Individual table update with routing field
type TableUpdate struct {
Table string `json:"table"` // Routing field - determines output topic
Data json.RawMessage `json:"data"` // The actual data to write
}

// Schema IDs for each output topic obtained from Schema Registry.
// Register schemas before deploying the transform using the {topic-name}-value naming convention
var schemaIDs = map[string]int{
"orders": 1,
"inventory": 2,
"customers": 3,
}

func main() {
log.Printf("Starting fanout transform with schema IDs: %v", schemaIDs)
transform.OnRecordWritten(routeUpdates)
}

func routeUpdates(event transform.WriteEvent, writer transform.RecordWriter) error {
var batch BatchMessage
if err := json.Unmarshal(event.Record().Value, &batch); err != nil {
log.Printf("Failed to parse batch message: %v", err)
return nil // Skip invalid records
}

// Process each update in the batch
for i, update := range batch.Updates {
schemaID, exists := schemaIDs[update.Table]
if !exists {
log.Printf("Unknown table in update %d: %s", i, update.Table)
continue
}

if err := writeUpdate(update, schemaID, writer, event); err != nil {
log.Printf("Failed to write update %d to %s: %v", i, update.Table, err)
}
}

return nil
}

func writeUpdate(update TableUpdate, schemaID int, writer transform.RecordWriter, event transform.WriteEvent) error {
// Create Schema Registry wire format: [magic_byte, schema_id (4 bytes BE), data...]
value := make([]byte, 5)
value[0] = 0 // magic byte
binary.BigEndian.PutUint32(value[1:5], uint32(schemaID))
value = append(value, update.Data...)

record := transform.Record{
Key: event.Record().Key,
Value: value,
}

return writer.Write(record, transform.ToTopic(update.Table))
}
----
--

Rust::
+
--
.`Cargo.toml`
[%collapsible]
====
[,toml]
----
[package]
name = "fanout-rust-example"
version = "0.1.0"
edition = "2021"

[dependencies]
redpanda-transform-sdk = "1.1.0" # v1.1.0+ required for WriteOptions API
serde = { version = "1", features = ["derive"] }
serde_json = "1"

[profile.release]
opt-level = "z"
lto = true
strip = true
----
====

`src/main.rs`:

[,rust]
----
use redpanda_transform_sdk::*;
use serde::Deserialize;
use std::collections::HashMap;
use std::error::Error;

#[derive(Deserialize)]
struct BatchMessage {
updates: Vec<TableUpdate>,
}

#[derive(Deserialize)]
struct TableUpdate {
table: String,
data: serde_json::Value,
}

// Schema IDs for each output topic obtained from Schema Registry.
// Register schemas before deploying the transform using the {topic-name}-value naming convention
static mut SCHEMA_IDS: Option<HashMap<String, i32>> = None;

fn main() {
let mut schema_ids = HashMap::new();
schema_ids.insert("orders".to_string(), 1);
schema_ids.insert("inventory".to_string(), 2);
schema_ids.insert("customers".to_string(), 3);

unsafe {
SCHEMA_IDS = Some(schema_ids);
}

on_record_written(route_updates);
}

fn write_update(
update: &TableUpdate,
schema_id: i32,
writer: &mut RecordWriter,
event: &WriteEvent,
) -> Result<(), Box<dyn Error>> {
// Create Schema Registry wire format: [magic_byte, schema_id (4 bytes BE), data...]
let mut value = vec![0u8; 5];
value[0] = 0; // magic byte
value[1..5].copy_from_slice(&schema_id.to_be_bytes());

let data_bytes = serde_json::to_vec(&update.data)?;
value.extend_from_slice(&data_bytes);

let key = event.record.key().map(|k| k.to_vec());
let record = BorrowedRecord::new(key.as_deref(), Some(&value));

writer.write_with_options(record, WriteOptions::to_topic(&update.table))?;
Ok(())
}

fn route_updates(event: WriteEvent, writer: &mut RecordWriter) -> Result<(), Box<dyn Error>> {
let batch: BatchMessage = serde_json::from_slice(event.record.value().unwrap_or_default())?;
let schema_ids = unsafe { SCHEMA_IDS.as_ref().unwrap() };

for update in batch.updates.iter() {
if let Some(&schema_id) = schema_ids.get(&update.table) {
write_update(update, schema_id, writer, &event)?;
}
}

Ok(())
}
----
--

JavaScript::
+
--
The JavaScript SDK does not support writing records to specific output topics. For multi-topic fan-out, use the Go or Rust SDK.
--
======

=== Connect to the Schema Registry

You can use the Schema Registry client library to read and write schemas as well as serialize and deserialize records. This client library is useful when working with schema-based topics in your data transforms.
Expand Down