diff --git a/sdks/go/pkg/beam/core/util/dot/dot.go b/sdks/go/pkg/beam/core/util/dot/dot.go index 727151551f9f..27c5e6bce2fe 100644 --- a/sdks/go/pkg/beam/core/util/dot/dot.go +++ b/sdks/go/pkg/beam/core/util/dot/dot.go @@ -14,6 +14,9 @@ // limitations under the License. // Package dot produces DOT graphs from Beam graph representations. +// +// Deprecated:This package is no longer used by the Beam Go SDK. +// It is slated for removal in a future Beam release. package dot import ( diff --git a/sdks/go/pkg/beam/runners/dot/dot.go b/sdks/go/pkg/beam/runners/dot/dot.go index 3c31ba3a8da7..c5f3cc3c8d9c 100644 --- a/sdks/go/pkg/beam/runners/dot/dot.go +++ b/sdks/go/pkg/beam/runners/dot/dot.go @@ -21,13 +21,18 @@ import ( "bytes" "context" "flag" + "fmt" "os" + "sort" "github.com/apache/beam/sdks/v2/go/pkg/beam" - dotlib "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/dot" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) +var errNoComponents = errors.New("pipeline has no components") + func init() { beam.RegisterRunner("dot", Execute) } @@ -42,14 +47,74 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) return nil, errors.New("must supply dot_file argument") } - edges, nodes, err := p.Build() + edges, _, err := p.Build() if err != nil { return nil, errors.New("can't get data to render") } - var buf bytes.Buffer - if err := dotlib.Render(edges, nodes, &buf); err != nil { + pipeline, err := graphx.Marshal(edges, &graphx.Options{}) + if err != nil { return nil, err } + + var buf bytes.Buffer + buf.WriteString("digraph G {\n") + + components := pipeline.GetComponents() + if components == nil { + return nil, errNoComponents + } + + transforms := components.GetTransforms() + + // Build reverse input index: PCollectionID -> []TransformID + consumers := make(map[string][]string) + for tid, t := range transforms { + // Skip composite transforms + if len(t.GetSubtransforms()) != 0 { + continue + } + + for _, pcollID := range t.GetInputs() { + consumers[pcollID] = append(consumers[pcollID], tid) + } + } + + // Ensure deterministic ordering of consumer lists + for pcollID := range consumers { + sort.Strings(consumers[pcollID]) + } + + // Topologically sort transforms for deterministic emission. + // We use the same ordering utility as Prism to ensure stable and execution-consistent graph traversal. + roots := pipeline.GetRootTransformIds() + ordered := pipelinex.TopologicalSort(transforms, roots) + + // Generate edges + for _, tid := range ordered { + t := transforms[tid] + // Skip composite transforms + if len(t.GetSubtransforms()) != 0 { + continue + } + + from := t.GetUniqueName() + + for _, pcollID := range t.GetOutputs() { + for _, consumerID := range consumers[pcollID] { + + consumer, ok := transforms[consumerID] + if !ok { + continue // Defensively skip if the consumer transform is missing + } + + to := consumer.GetUniqueName() + fmt.Fprintf(&buf, "\"%s\" -> \"%s\";\n", from, to) + } + } + } + + buf.WriteString("}\n") + return nil, os.WriteFile(*dotFile, buf.Bytes(), 0644) } diff --git a/sdks/go/pkg/beam/runners/dot/dot_test.go b/sdks/go/pkg/beam/runners/dot/dot_test.go new file mode 100644 index 000000000000..acd2f232b58f --- /dev/null +++ b/sdks/go/pkg/beam/runners/dot/dot_test.go @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package dot + +import ( + "context" + "os" + "strings" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" +) + +func TestDotRunner_GeneratesDeterministicOutput(t *testing.T) { + ctx := context.Background() + + // Create temporary DOT file + tmpFile, err := os.CreateTemp("", "dot_test_*.dot") + if err != nil { + t.Fatalf("failed to create temp file: %v", err) + } + defer os.Remove(tmpFile.Name()) + + // Set flag manually + *dotFile = tmpFile.Name() + + // Build simple pipeline + p, s := beam.NewPipelineWithRoot() + + col := beam.Create(s, "a", "b", "c") + passert.Count(s, col, "", 3) + + // Run with dot runner + _, err = Execute(ctx, p) + if err != nil { + t.Fatalf("Execute failed: %v", err) + } + + // Read generated file + data, err := os.ReadFile(tmpFile.Name()) + if err != nil { + t.Fatalf("failed to read dot file: %v", err) + } + + content := string(data) + + if !strings.HasPrefix(content, "digraph G {") { + t.Fatalf("dot output missing header") + } + + if !strings.Contains(content, "->") { + t.Fatalf("dot output contains no edges") + } +}