diff --git a/modules/develop/pages/data-transforms/build.adoc b/modules/develop/pages/data-transforms/build.adoc index 14ac95d8da..9034bf1106 100644 --- a/modules/develop/pages/data-transforms/build.adoc +++ b/modules/develop/pages/data-transforms/build.adoc @@ -1,10 +1,21 @@ = Develop Data Transforms :description: Learn how to initialize a data transforms project and write transform functions in your chosen language. :page-categories: Development, Stream Processing, Data Transforms +:page-topic-type: how-to +:personas: streaming_developer, application_developer +:learning-objective-1: Initialize a data transforms project with the rpk CLI +:learning-objective-2: Build transform functions that process records and write to output topics +:learning-objective-3: Implement multi-topic routing patterns with Schema Registry integration // tag::single-source[] {description} +After reading this page, you will be able to: + +* [ ] {learning-objective-1} +* [ ] {learning-objective-2} +* [ ] {learning-objective-3} + == Prerequisites You must have the following development tools installed on your host machine: @@ -44,7 +55,7 @@ To initialize a data transforms project, use the following command to set up the rpk transform init --language= --name= ---- -If you do not include the `--language` flag, the command will prompt you for the language. Supported languages include: +If you do not include the `--language` flag, the command prompts you for the language. Supported languages include: * `tinygo-no-goroutines` (does not include https://golangdocs.com/goroutines-in-golang[Goroutines]) * `tinygo-with-goroutines` @@ -52,7 +63,7 @@ If you do not include the `--language` flag, the command will prompt you for the * `javascript` * `typescript` -For example, if you choose `tinygo-no-goroutines`, the following project files are created: +For example, if you choose `tinygo-no-goroutines`, `rpk` creates the following project files: [.no-copy] ---- @@ -165,7 +176,7 @@ await esbuild.build({ process: false, // Don't inject the process global, the Redpanda JavaScript runtime does that. }, polyfills: { -= crypto: true, // Enable crypto polyfill + crypto: true, // Enable crypto polyfill // Add other polyfills as needed }, }), @@ -180,7 +191,7 @@ await esbuild.build({ By distinguishing between recoverable and critical errors, you can ensure that your transform functions are both resilient and robust. Handling recoverable errors internally helps maintain continuous operation, while allowing critical errors to escape ensures that the system can address severe issues effectively. -Redpanda tracks the offsets of records that have been processed by transform functions. If an error escapes the Wasm virtual machine (VM), the VM will fail. When the Wasm engine detects this failure and starts a new VM, the transform function will retry processing the input topics from the last processed offset, potentially leading to repeated failures if the underlying issue is not resolved. +Redpanda tracks the offsets of records that transform functions have processed. If an error escapes the Wasm virtual machine (VM), the VM will fail. When the Wasm engine detects this failure and starts a new VM, the transform function retries processing the input topics from the last processed offset, potentially leading to repeated failures if the underlying issue is not resolved. Handling errors internally by logging them and continuing to process subsequent records can help maintain continuous operation. However, this approach can result in silently discarding problematic records, which may lead to unnoticed data loss if the logs are not monitored closely. @@ -401,14 +412,26 @@ onRecordWritten((event, writer) => { === Write to specific output topics -You can configure your transform function to write records to specific output topics. This is useful for filtering or routing messages based on certain criteria. The following example shows a filter that outputs only valid JSON from the input topic into the output topic. Invalid JSON is written to a different output topic. +You can configure your transform function to write records to specific output topics based on message content, enabling powerful routing and fan-out patterns. This capability is useful for: + +* Filtering messages by criteria and routing to different topics +* Fan-out patterns that distribute data from one input topic to multiple output topics +* Event routing based on message type or schema +* Data distribution for downstream consumers + +Wasm transforms provide a simpler alternative to external connectors like Kafka Connect for in-broker data routing, with lower latency and no additional infrastructure to manage. + +==== Basic JSON validation example + +The following example shows a filter that outputs only valid JSON from the input topic into the output topic. The transform writes invalid JSON to a different output topic. [tabs] ====== Go:: + -- -```go +[source,go] +---- import ( "encoding/json" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" @@ -420,17 +443,18 @@ func main() { func filterValidJson(event transform.WriteEvent, writer transform.RecordWriter) error { if json.Valid(event.Record().Value) { - return w.Write(e.Record()) + return writer.Write(event.Record()) } // Send invalid records to separate topic - return writer.Write(e.Record(), transform.ToTopic("invalid-json")) + return writer.Write(event.Record(), transform.ToTopic("invalid-json")) } -``` +---- -- Rust:: + -- -```rust +[source,rust] +---- use anyhow::Result; use redpanda_transform_sdk::*; @@ -448,7 +472,7 @@ fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> } Ok(()) } -``` +---- -- JavaScript:: + @@ -458,6 +482,239 @@ The JavaScript SDK does not support writing records to a specific output topic. -- ====== +[[multi-topic-fanout]] +==== Multi-topic fan-out with Schema Registry + +This example routes batched updates from a single input topic to multiple output topics based on a routing field in each message. Messages are encoded with the xref:manage:schema-reg/schema-reg-overview.adoc#wire-format[Schema Registry wire format] for validation against the output topic schema. Consider using this pattern with Iceberg-enabled topics to fan out data directly into lakehouse tables. + +.Input message example +[,json] +---- +{ + "updates": [ + {"table": "orders", "data": {"order_id": "123", "amount": 99.99}}, + {"table": "inventory", "data": {"product_id": "P456", "quantity": 50}}, + {"table": "customers", "data": {"customer_id": "C789", "name": "Jane"}} + ] +} +---- + +xref:develop:data-transforms/configure.adoc[Configure the transform] with multiple output topics: + +[,yaml] +---- +name: event-router +input_topic: events +output_topics: + - orders + - inventory + - customers +---- + +The transform extracts each update and routes it to the appropriate topic based on the `table` field. In this example, it is assumed that you have created each output topic and registered the corresponding schemas in Schema Registry. + +NOTE: xref:manage:schema-reg/schema-reg-api.adoc[Register schemas in Schema Registry] before deploying the transform. Replace the hardcoded schema IDs (1, 2, 3) in the code examples with the IDs returned by Schema Registry during registration. Use the `{topic-name}-value` naming convention for schema subjects (for example, `orders-value`, `inventory-value`). + +[tabs] +====== +Go:: ++ +-- +.`go.mod` +[%collapsible] +==== +[,go] +---- +module fanout-example + +go 1.20 + +require github.com/redpanda-data/redpanda/src/transform-sdk/go/transform v1.1.0 // v1.1.0+ required +---- +==== + +`transform.go`: + +[,go] +---- +package main + +import ( + "encoding/binary" + "encoding/json" + "log" + "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" +) + +// Input message structure with array of updates +type BatchMessage struct { + Updates []TableUpdate `json:"updates"` +} + +// Individual table update with routing field +type TableUpdate struct { + Table string `json:"table"` // Routing field - determines output topic + Data json.RawMessage `json:"data"` // The actual data to write +} + +// Schema IDs for each output topic obtained from Schema Registry. +// Register schemas before deploying the transform using the {topic-name}-value naming convention +var schemaIDs = map[string]int{ + "orders": 1, + "inventory": 2, + "customers": 3, +} + +func main() { + log.Printf("Starting fanout transform with schema IDs: %v", schemaIDs) + transform.OnRecordWritten(routeUpdates) +} + +func routeUpdates(event transform.WriteEvent, writer transform.RecordWriter) error { + var batch BatchMessage + if err := json.Unmarshal(event.Record().Value, &batch); err != nil { + log.Printf("Failed to parse batch message: %v", err) + return nil // Skip invalid records + } + + // Process each update in the batch + for i, update := range batch.Updates { + schemaID, exists := schemaIDs[update.Table] + if !exists { + log.Printf("Unknown table in update %d: %s", i, update.Table) + continue + } + + if err := writeUpdate(update, schemaID, writer, event); err != nil { + log.Printf("Failed to write update %d to %s: %v", i, update.Table, err) + } + } + + return nil +} + +func writeUpdate(update TableUpdate, schemaID int, writer transform.RecordWriter, event transform.WriteEvent) error { + // Create Schema Registry wire format: [magic_byte, schema_id (4 bytes BE), data...] + value := make([]byte, 5) + value[0] = 0 // magic byte + binary.BigEndian.PutUint32(value[1:5], uint32(schemaID)) + value = append(value, update.Data...) + + record := transform.Record{ + Key: event.Record().Key, + Value: value, + } + + return writer.Write(record, transform.ToTopic(update.Table)) +} +---- +-- + +Rust:: ++ +-- +.`Cargo.toml` +[%collapsible] +==== +[,toml] +---- +[package] +name = "fanout-rust-example" +version = "0.1.0" +edition = "2021" + +[dependencies] +redpanda-transform-sdk = "1.1.0" # v1.1.0+ required for WriteOptions API +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +[profile.release] +opt-level = "z" +lto = true +strip = true +---- +==== + +`src/main.rs`: + +[,rust] +---- +use redpanda_transform_sdk::*; +use serde::Deserialize; +use std::collections::HashMap; +use std::error::Error; + +#[derive(Deserialize)] +struct BatchMessage { + updates: Vec, +} + +#[derive(Deserialize)] +struct TableUpdate { + table: String, + data: serde_json::Value, +} + +// Schema IDs for each output topic obtained from Schema Registry. +// Register schemas before deploying the transform using the {topic-name}-value naming convention +static mut SCHEMA_IDS: Option> = None; + +fn main() { + let mut schema_ids = HashMap::new(); + schema_ids.insert("orders".to_string(), 1); + schema_ids.insert("inventory".to_string(), 2); + schema_ids.insert("customers".to_string(), 3); + + unsafe { + SCHEMA_IDS = Some(schema_ids); + } + + on_record_written(route_updates); +} + +fn write_update( + update: &TableUpdate, + schema_id: i32, + writer: &mut RecordWriter, + event: &WriteEvent, +) -> Result<(), Box> { + // Create Schema Registry wire format: [magic_byte, schema_id (4 bytes BE), data...] + let mut value = vec![0u8; 5]; + value[0] = 0; // magic byte + value[1..5].copy_from_slice(&schema_id.to_be_bytes()); + + let data_bytes = serde_json::to_vec(&update.data)?; + value.extend_from_slice(&data_bytes); + + let key = event.record.key().map(|k| k.to_vec()); + let record = BorrowedRecord::new(key.as_deref(), Some(&value)); + + writer.write_with_options(record, WriteOptions::to_topic(&update.table))?; + Ok(()) +} + +fn route_updates(event: WriteEvent, writer: &mut RecordWriter) -> Result<(), Box> { + let batch: BatchMessage = serde_json::from_slice(event.record.value().unwrap_or_default())?; + let schema_ids = unsafe { SCHEMA_IDS.as_ref().unwrap() }; + + for update in batch.updates.iter() { + if let Some(&schema_id) = schema_ids.get(&update.table) { + write_update(update, schema_id, writer, &event)?; + } + } + + Ok(()) +} +---- +-- + +JavaScript:: ++ +-- +The JavaScript SDK does not support writing records to specific output topics. For multi-topic fan-out, use the Go or Rust SDK. +-- +====== + === Connect to the Schema Registry You can use the Schema Registry client library to read and write schemas as well as serialize and deserialize records. This client library is useful when working with schema-based topics in your data transforms.