From 7e6be92dc80466a9444928ff7a2030150186b75d Mon Sep 17 00:00:00 2001 From: Yousuf Ansari Date: Sat, 21 Feb 2026 06:32:30 +0000 Subject: [PATCH 1/3] [Go SDK] Rewrite dot runner to generate DOT from portable pipeline proto --- sdks/go/pkg/beam/runners/dot/dot.go | 60 +++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dot/dot.go b/sdks/go/pkg/beam/runners/dot/dot.go index 3c31ba3a8da7..835a5317002d 100644 --- a/sdks/go/pkg/beam/runners/dot/dot.go +++ b/sdks/go/pkg/beam/runners/dot/dot.go @@ -21,10 +21,11 @@ import ( "bytes" "context" "flag" + "fmt" "os" "github.com/apache/beam/sdks/v2/go/pkg/beam" - dotlib "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/dot" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) @@ -42,14 +43,65 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) return nil, errors.New("must supply dot_file argument") } - edges, nodes, err := p.Build() + edges, _, err := p.Build() if err != nil { return nil, errors.New("can't get data to render") } - var buf bytes.Buffer - if err := dotlib.Render(edges, nodes, &buf); err != nil { + pipeline, err := graphx.Marshal(edges, &graphx.Options{}) + if err != nil { return nil, err } + + var buf bytes.Buffer + buf.WriteString("digraph G {\n") + + components := pipeline.GetComponents() + if components == nil { + return nil, errors.New("pipeline has no components") + } + + transforms := components.GetTransforms() + + // Build reverse input index: PCollectionID -> []TransformID + consumers := make(map[string][]string) + for tid, t := range transforms { + // Skip composite transforms + if len(t.GetSubtransforms()) != 0 { + continue + } + + for _, pcollID := range t.GetInputs() { + consumers[pcollID] = append(consumers[pcollID], tid) + } + } + + // Generate edges + for _, t := range transforms { + // Skip composite transforms + if len(t.GetSubtransforms()) != 0 { + continue + } + + from := t.GetUniqueName() + + for _, pcollID := range t.GetOutputs() { + for _, consumerID := range consumers[pcollID] { + + consumer := transforms[consumerID] + + // Skip composite consumers + if len(consumer.GetSubtransforms()) != 0 { + continue + } + + to := consumer.GetUniqueName() + fmt.Fprintf(&buf, "\"%s\" -> \"%s\";\n", from, to) + } + } + } + + buf.WriteString("}\n") + return nil, os.WriteFile(*dotFile, buf.Bytes(), 0644) } From 30926f4df8d2df2ef18b73a0a6e89bc628301b6f Mon Sep 17 00:00:00 2001 From: Yousuf Ansari Date: Wed, 25 Feb 2026 18:51:20 +0000 Subject: [PATCH 2/3] [Go SDK] dot runner: generate DOT from portable proto, skip composites, add deterministic ordering and test --- sdks/go/pkg/beam/runners/dot/dot.go | 24 +++++++++-- sdks/go/pkg/beam/runners/dot/dot_test.go | 53 ++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 3 deletions(-) create mode 100644 sdks/go/pkg/beam/runners/dot/dot_test.go diff --git a/sdks/go/pkg/beam/runners/dot/dot.go b/sdks/go/pkg/beam/runners/dot/dot.go index 835a5317002d..1af89057c6de 100644 --- a/sdks/go/pkg/beam/runners/dot/dot.go +++ b/sdks/go/pkg/beam/runners/dot/dot.go @@ -23,12 +23,16 @@ import ( "flag" "fmt" "os" + "sort" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) +var errNoComponents = errors.New("pipeline has no components") + func init() { beam.RegisterRunner("dot", Execute) } @@ -58,7 +62,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) components := pipeline.GetComponents() if components == nil { - return nil, errors.New("pipeline has no components") + return nil, errNoComponents } transforms := components.GetTransforms() @@ -76,8 +80,19 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) } } + // Ensure deterministic ordering of consumer lists + for pcollID := range consumers { + sort.Strings(consumers[pcollID]) + } + + // Topologically sort transforms for deterministic emission. + // We use the same ordering utility as Prism to ensure stable and execution-consistent graph traversal. + roots := pipeline.GetRootTransformIds() + ordered := pipelinex.TopologicalSort(transforms, roots) + // Generate edges - for _, t := range transforms { + for _, tid := range ordered { + t := transforms[tid] // Skip composite transforms if len(t.GetSubtransforms()) != 0 { continue @@ -88,7 +103,10 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) for _, pcollID := range t.GetOutputs() { for _, consumerID := range consumers[pcollID] { - consumer := transforms[consumerID] + consumer, ok := transforms[consumerID] + if !ok { + continue // Defensively skip if the consumer transform is missing + } // Skip composite consumers if len(consumer.GetSubtransforms()) != 0 { diff --git a/sdks/go/pkg/beam/runners/dot/dot_test.go b/sdks/go/pkg/beam/runners/dot/dot_test.go new file mode 100644 index 000000000000..bae78a36e5db --- /dev/null +++ b/sdks/go/pkg/beam/runners/dot/dot_test.go @@ -0,0 +1,53 @@ +package dot + +import ( + "context" + "os" + "strings" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" +) + +func TestDotRunner_GeneratesDeterministicOutput(t *testing.T) { + ctx := context.Background() + + // Create temporary DOT file + tmpFile, err := os.CreateTemp("", "dot_test_*.dot") + if err != nil { + t.Fatalf("failed to create temp file: %v", err) + } + defer os.Remove(tmpFile.Name()) + + // Set flag manually + *dotFile = tmpFile.Name() + + // Build simple pipeline + p, s := beam.NewPipelineWithRoot() + + col := beam.Create(s, "a", "b", "c") + passert.Count(s, col, "", 3) + + // Run with dot runner + _, err = Execute(ctx, p) + if err != nil { + t.Fatalf("Execute failed: %v", err) + } + + // Read generated file + data, err := os.ReadFile(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to read dot file: %v", err) + } + + content := string(data) + + if !strings.HasPrefix(content, "digraph G {") { + t.Fatalf("dot output missing header") + } + + if !strings.Contains(content, "->") { + t.Fatalf("dot output contains no edges") + } +} From 8c826f00057278cfda0f9f8c67d6b155fd79e6f0 Mon Sep 17 00:00:00 2001 From: Yousuf Ansari Date: Wed, 25 Feb 2026 19:17:17 +0000 Subject: [PATCH 3/3] [Go SDK] DOT runner: Added License Header, Marked package dot as deprecated and removed an unreachable block of test code --- sdks/go/pkg/beam/core/util/dot/dot.go | 3 +++ sdks/go/pkg/beam/runners/dot/dot.go | 5 ----- sdks/go/pkg/beam/runners/dot/dot_test.go | 14 ++++++++++++++ 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/core/util/dot/dot.go b/sdks/go/pkg/beam/core/util/dot/dot.go index 727151551f9f..27c5e6bce2fe 100644 --- a/sdks/go/pkg/beam/core/util/dot/dot.go +++ b/sdks/go/pkg/beam/core/util/dot/dot.go @@ -14,6 +14,9 @@ // limitations under the License. // Package dot produces DOT graphs from Beam graph representations. +// +// Deprecated:This package is no longer used by the Beam Go SDK. +// It is slated for removal in a future Beam release. package dot import ( diff --git a/sdks/go/pkg/beam/runners/dot/dot.go b/sdks/go/pkg/beam/runners/dot/dot.go index 1af89057c6de..c5f3cc3c8d9c 100644 --- a/sdks/go/pkg/beam/runners/dot/dot.go +++ b/sdks/go/pkg/beam/runners/dot/dot.go @@ -108,11 +108,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) continue // Defensively skip if the consumer transform is missing } - // Skip composite consumers - if len(consumer.GetSubtransforms()) != 0 { - continue - } - to := consumer.GetUniqueName() fmt.Fprintf(&buf, "\"%s\" -> \"%s\";\n", from, to) } diff --git a/sdks/go/pkg/beam/runners/dot/dot_test.go b/sdks/go/pkg/beam/runners/dot/dot_test.go index bae78a36e5db..acd2f232b58f 100644 --- a/sdks/go/pkg/beam/runners/dot/dot_test.go +++ b/sdks/go/pkg/beam/runners/dot/dot_test.go @@ -1,3 +1,17 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package dot import (