From 3a236bb5d3094babe61b8befc6fd33f12a5fb21e Mon Sep 17 00:00:00 2001 From: PG1204 Date: Fri, 19 Jun 2026 12:23:16 -0700 Subject: [PATCH 1/3] refactor(frontend): capture per-operator performance metrics in WorkflowStatusService --- .../performance-metrics.spec.ts | 217 ++++++++++++++++++ .../workflow-status/performance-metrics.ts | 160 +++++++++++++ .../workflow-status.service.spec.ts | 165 +++++++++++++ .../workflow-status.service.ts | 44 +++- .../types/execute-workflow.interface.ts | 5 + 5 files changed, 589 insertions(+), 2 deletions(-) create mode 100644 frontend/src/app/workspace/service/workflow-status/performance-metrics.spec.ts create mode 100644 frontend/src/app/workspace/service/workflow-status/performance-metrics.ts create mode 100644 frontend/src/app/workspace/service/workflow-status/workflow-status.service.spec.ts diff --git a/frontend/src/app/workspace/service/workflow-status/performance-metrics.spec.ts b/frontend/src/app/workspace/service/workflow-status/performance-metrics.spec.ts new file mode 100644 index 00000000000..21695143da3 --- /dev/null +++ b/frontend/src/app/workspace/service/workflow-status/performance-metrics.spec.ts @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { + HeatmapView, + OperatorPerformanceMetrics, + normalizeScores, + rawMetricForView, + toPerformanceMetrics, +} from "./performance-metrics"; +import { OperatorState, OperatorStatistics } from "../../types/execute-workflow.interface"; + +/** + * A complete statistics object, mirroring what the backend sends once every + * field is typed. All five timing/size fields are present. + */ +const fullStats: OperatorStatistics = { + operatorState: OperatorState.Running, + aggregatedInputRowCount: 1_000_000, + aggregatedInputSize: 84_000_000, + inputPortMetrics: { "0": 1_000_000 }, + aggregatedOutputRowCount: 12_000, + aggregatedOutputSize: 1_010_000, + outputPortMetrics: { "0": 12_000 }, + numWorkers: 4, + aggregatedDataProcessingTime: 8_500_000_000, + aggregatedControlProcessingTime: 120_000_000, + aggregatedIdleTime: 300_000_000, +}; + +/** + * The partial shape produced by WorkflowStatusService.resetStatus(): only the + * required fields, none of the five optional timing/size fields, no numWorkers. + * The mapper must survive this without emitting NaN/undefined. + */ +const partialStats: OperatorStatistics = { + operatorState: OperatorState.Uninitialized, + aggregatedInputRowCount: 0, + inputPortMetrics: {}, + aggregatedOutputRowCount: 0, + outputPortMetrics: {}, +}; + +/** Build an OperatorPerformanceMetrics for the rawMetricForView tests. */ +function makeMetrics(overrides: Partial): OperatorPerformanceMetrics { + return { + operatorId: "op", + dataProcessingTimeNs: 0, + controlProcessingTimeNs: 0, + idleTimeNs: 0, + inputRows: 0, + outputRows: 0, + inputSize: 0, + outputSize: 0, + numWorkers: 0, + ...overrides, + }; +} + +describe("toPerformanceMetrics", () => { + it("maps every field from a full statistics object", () => { + const m = toPerformanceMetrics("Filter-operator-a1b2", fullStats); + expect(m).toEqual({ + operatorId: "Filter-operator-a1b2", + dataProcessingTimeNs: 8_500_000_000, + controlProcessingTimeNs: 120_000_000, + idleTimeNs: 300_000_000, + inputRows: 1_000_000, + outputRows: 12_000, + inputSize: 84_000_000, + outputSize: 1_010_000, + numWorkers: 4, + }); + }); + + it("keeps data and control processing time as separate fields", () => { + const m = toPerformanceMetrics("op", fullStats); + expect(m.dataProcessingTimeNs).toBe(fullStats.aggregatedDataProcessingTime); + expect(m.controlProcessingTimeNs).toBe(fullStats.aggregatedControlProcessingTime); + }); + + it("defaults every optional/missing field to 0 (no NaN, no undefined)", () => { + const m = toPerformanceMetrics("op", partialStats); + expect(m).toEqual({ + operatorId: "op", + dataProcessingTimeNs: 0, + controlProcessingTimeNs: 0, + idleTimeNs: 0, + inputRows: 0, + outputRows: 0, + inputSize: 0, + outputSize: 0, + numWorkers: 0, + }); + // explicit guard: nothing leaked through as NaN + for (const value of Object.values(m)) { + if (typeof value === "number") { + expect(Number.isNaN(value)).toBe(false); + } + } + }); + + it("preserves a unicode operator id verbatim", () => { + const id = "算子-✓-1"; + expect(toPerformanceMetrics(id, fullStats).operatorId).toBe(id); + }); +}); + +describe("rawMetricForView", () => { + // Bottleneck-oriented semantics: a higher raw cost means "hotter" (more of a + // problem), before normalization. + + it("Runtime returns the data processing time (hotter = slower)", () => { + const m = makeMetrics({ dataProcessingTimeNs: 8_500_000_000 }); + expect(rawMetricForView(m, HeatmapView.Runtime)).toBe(8_500_000_000); + }); + + it("Throughput inverts output rows so slow producers are hot", () => { + const m = makeMetrics({ outputRows: 4 }); + expect(rawMetricForView(m, HeatmapView.Throughput)).toBe(0.25); + }); + + it("Throughput returns 0 when there is no output (no divide-by-zero)", () => { + const m = makeMetrics({ outputRows: 0 }); + const score = rawMetricForView(m, HeatmapView.Throughput); + expect(score).toBe(0); + expect(Number.isFinite(score)).toBe(true); + }); + + it("IoImbalance scores the row-drop rate (1 - out/in)", () => { + const m = makeMetrics({ inputRows: 1_000, outputRows: 250 }); + expect(rawMetricForView(m, HeatmapView.IoImbalance)).toBe(0.75); + }); + + it("IoImbalance returns 0 for an amplifier (out > in), clamped to [0,1]", () => { + const m = makeMetrics({ inputRows: 100, outputRows: 500 }); + expect(rawMetricForView(m, HeatmapView.IoImbalance)).toBe(0); + }); + + it("IoImbalance returns 0 when inputRows is 0 (no divide-by-zero)", () => { + const m = makeMetrics({ inputRows: 0, outputRows: 250 }); + const score = rawMetricForView(m, HeatmapView.IoImbalance); + expect(score).toBe(0); + expect(Number.isFinite(score)).toBe(true); + }); +}); + +describe("normalizeScores", () => { + it("returns an empty object for empty input", () => { + expect(normalizeScores({})).toEqual({}); + }); + + it("scores a single operator as 1 (it is the hottest)", () => { + expect(normalizeScores({ a: 42 })).toEqual({ a: 1 }); + }); + + it("scores all-equal values as 0.5 (avoids divide-by-zero)", () => { + expect(normalizeScores({ a: 5, b: 5, c: 5 })).toEqual({ a: 0.5, b: 0.5, c: 0.5 }); + }); + + it("scores all-zero values as 0.5", () => { + expect(normalizeScores({ a: 0, b: 0 })).toEqual({ a: 0.5, b: 0.5 }); + }); + + it("maps the min to 0 and the max to 1 for two distinct values", () => { + const scores = normalizeScores({ low: 1, high: 100 }); + expect(scores["low"]).toBe(0); + expect(scores["high"]).toBe(1); + }); + + it("keeps all scores within [0, 1]", () => { + const scores = normalizeScores({ a: 3, b: 50, c: 900, d: 12 }); + for (const s of Object.values(scores)) { + expect(s).toBeGreaterThanOrEqual(0); + expect(s).toBeLessThanOrEqual(1); + } + }); + + it("compresses heavy-tailed values so the middle is not flattened to ~0", () => { + // With plain linear min-max, 100 would map to (100-1)/(1000-1) ≈ 0.1. + // Log scaling lifts the middle well above 0.5, which is the point. + const scores = normalizeScores({ small: 1, mid: 100, big: 1000 }); + expect(scores["small"]).toBe(0); + expect(scores["big"]).toBe(1); + expect(scores["mid"]).toBeGreaterThan(0.5); + }); + + it("treats non-finite raw values as 0 rather than propagating NaN/Infinity", () => { + const scores = normalizeScores({ bad: Number.POSITIVE_INFINITY, worse: NaN, good: 100 }); + for (const s of Object.values(scores)) { + expect(Number.isFinite(s)).toBe(true); + } + // the only real magnitude wins the top of the scale + expect(scores["good"]).toBe(1); + }); + + it("preserves unicode operator ids as keys", () => { + const scores = normalizeScores({ "算子-✓": 10, b: 20 }); + expect(Object.keys(scores).sort()).toEqual(["b", "算子-✓"].sort()); + }); +}); diff --git a/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts b/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts new file mode 100644 index 00000000000..a56fd835242 --- /dev/null +++ b/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { OperatorStatistics } from "../../types/execute-workflow.interface"; + +/** + * Derived, normalized-ready per-operator performance metrics. + * + * This is the ground-truth model consumed by the workflow heat-map overlay. It + * is a flat, defensively-defaulted projection of the raw {@link OperatorStatistics} + * the backend streams over the websocket — every field is a finite, non-negative + * number so downstream scoring never has to re-validate. + */ +export interface OperatorPerformanceMetrics + extends Readonly<{ + operatorId: string; + dataProcessingTimeNs: number; + controlProcessingTimeNs: number; + idleTimeNs: number; + inputRows: number; + outputRows: number; + inputSize: number; + outputSize: number; + numWorkers: number; + }> {} + +/** + * The three heat-map views. Each answers a different "where is the problem?" + * question; see {@link rawMetricForView} for the per-operator cost each one uses. + * String-valued so the selection serializes readably (e.g. into localStorage). + */ +export enum HeatmapView { + Runtime = "runtime", + Throughput = "throughput", + IoImbalance = "ioImbalance", +} + +/** + * Coerce an untrusted numeric field (it arrives over the websocket) into a + * finite, non-negative number. Anything missing, non-numeric, NaN, infinite, or + * negative collapses to 0 so no NaN/Infinity can leak into the scoring math. + */ +function toFiniteNonNegative(value: number | undefined): number { + return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : 0; +} + +function clamp(value: number, min: number, max: number): number { + return Math.min(max, Math.max(min, value)); +} + +/** + * Project a single raw {@link OperatorStatistics} into the flat performance model, + * defaulting every optional/missing field to 0. Data and control processing time + * are kept separate (the Runtime view uses data time only). + */ +export function toPerformanceMetrics(operatorId: string, stats: OperatorStatistics): OperatorPerformanceMetrics { + return { + operatorId, + dataProcessingTimeNs: toFiniteNonNegative(stats.aggregatedDataProcessingTime), + controlProcessingTimeNs: toFiniteNonNegative(stats.aggregatedControlProcessingTime), + idleTimeNs: toFiniteNonNegative(stats.aggregatedIdleTime), + inputRows: toFiniteNonNegative(stats.aggregatedInputRowCount), + outputRows: toFiniteNonNegative(stats.aggregatedOutputRowCount), + inputSize: toFiniteNonNegative(stats.aggregatedInputSize), + outputSize: toFiniteNonNegative(stats.aggregatedOutputSize), + numWorkers: toFiniteNonNegative(stats.numWorkers), + }; +} + +/** + * Per-operator raw cost for a view, BEFORE normalization. Bottleneck-oriented: + * a higher cost means "hotter" (more of a problem). + * + * - Runtime: data processing time — slower operators are hotter. + * - Throughput: 1 / outputRows — slow producers are hotter; no output -> 0 (cold). + * - IoImbalance: clamp(1 - out/in) — row-dropping operators are hotter; an + * amplifier (out > in) or a missing input clamps to 0 (cold). + * + * The metrics are already finite and non-negative (see {@link toPerformanceMetrics}), + * so this never produces NaN/Infinity. + */ +export function rawMetricForView(metrics: OperatorPerformanceMetrics, view: HeatmapView): number { + switch (view) { + case HeatmapView.Runtime: + return metrics.dataProcessingTimeNs; + case HeatmapView.Throughput: + return metrics.outputRows > 0 ? 1 / metrics.outputRows : 0; + case HeatmapView.IoImbalance: + return metrics.inputRows <= 0 ? 0 : clamp(1 - metrics.outputRows / metrics.inputRows, 0, 1); + default: + return 0; + } +} + +/** + * Normalize per-operator raw costs into [0, 1] heat scores. + * + * Uses log1p compression then min-max across operators, so a single dominant + * operator does not flatten everyone else toward 0. Rules: + * - empty input -> {} + * - single operator -> 1 if it did measurable work, else 0.5 (neutral) + * - all values equal -> 0.5 for everyone (no spread to show; avoids /0) + * - otherwise -> min maps to 0, max maps to 1, rest interpolated + * + * Non-finite / negative raw costs are treated as 0 before scoring. + */ +export function normalizeScores(rawById: Record): Record { + const operatorIds = Object.keys(rawById); + if (operatorIds.length === 0) { + return {}; + } + + // Defensive coercion + log1p compression in one pass. + const compressed: Record = {}; + for (const operatorId of operatorIds) { + compressed[operatorId] = Math.log1p(toFiniteNonNegative(rawById[operatorId])); + } + + if (operatorIds.length === 1) { + const onlyId = operatorIds[0]; + return { [onlyId]: compressed[onlyId] > 0 ? 1 : 0.5 }; + } + + const values = Object.values(compressed); + const min = Math.min(...values); + const max = Math.max(...values); + + // All operators have the same cost (covers the all-zero case): no spread to + // render, so paint everyone neutral rather than dividing by zero. + if (max === min) { + const neutral: Record = {}; + for (const operatorId of operatorIds) { + neutral[operatorId] = 0.5; + } + return neutral; + } + + const range = max - min; + const scores: Record = {}; + for (const operatorId of operatorIds) { + scores[operatorId] = clamp((compressed[operatorId] - min) / range, 0, 1); + } + return scores; +} diff --git a/frontend/src/app/workspace/service/workflow-status/workflow-status.service.spec.ts b/frontend/src/app/workspace/service/workflow-status/workflow-status.service.spec.ts new file mode 100644 index 00000000000..fb1614c9e0a --- /dev/null +++ b/frontend/src/app/workspace/service/workflow-status/workflow-status.service.spec.ts @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { TestBed } from "@angular/core/testing"; +import { Subject } from "rxjs"; +import { WorkflowStatusService } from "./workflow-status.service"; +import { WorkflowWebsocketService } from "../workflow-websocket/workflow-websocket.service"; +import { OperatorPerformanceMetrics } from "./performance-metrics"; +import { OperatorState, OperatorStatistics } from "../../types/execute-workflow.interface"; +import { TexeraWebsocketEvent } from "../../types/workflow-websocket.interface"; + +const sampleStats: OperatorStatistics = { + operatorState: OperatorState.Running, + aggregatedInputRowCount: 1_000, + aggregatedInputSize: 8_000, + inputPortMetrics: { "0": 1_000 }, + aggregatedOutputRowCount: 250, + aggregatedOutputSize: 2_000, + outputPortMetrics: { "0": 250 }, + numWorkers: 2, + aggregatedDataProcessingTime: 5_000_000, + aggregatedControlProcessingTime: 1_000_000, + aggregatedIdleTime: 700_000, +}; + +function statsEvent(operatorStatistics: Record): TexeraWebsocketEvent { + return { type: "OperatorStatisticsUpdateEvent", operatorStatistics } as TexeraWebsocketEvent; +} + +describe("WorkflowStatusService", () => { + let service: WorkflowStatusService; + let websocketEventSubject: Subject; + + beforeEach(() => { + websocketEventSubject = new Subject(); + const websocketStub: Partial = { + websocketEvent: () => websocketEventSubject.asObservable(), + }; + TestBed.configureTestingModule({ + providers: [WorkflowStatusService, { provide: WorkflowWebsocketService, useValue: websocketStub }], + }); + service = TestBed.inject(WorkflowStatusService); + }); + + it("forwards an OperatorStatisticsUpdateEvent to the status stream", () => { + const received: Record[] = []; + service.getStatusUpdateStream().subscribe(s => received.push(s)); + + websocketEventSubject.next(statsEvent({ op1: sampleStats })); + + expect(received).toHaveLength(1); + expect(received[0]).toEqual({ op1: sampleStats }); + expect(service.getCurrentStatus()).toEqual({ op1: sampleStats }); + }); + + it("ignores websocket events of other types", () => { + const received: Record[] = []; + service.getStatusUpdateStream().subscribe(s => received.push(s)); + + websocketEventSubject.next({ type: "WorkflowErrorEvent" } as unknown as TexeraWebsocketEvent); + + expect(received).toHaveLength(0); + expect(service.getCurrentStatus()).toEqual({}); + }); + + it("derives performance metrics from a status update", () => { + const emissions: Record[] = []; + service.getPerformanceMetricsStream().subscribe(m => emissions.push(m)); + + websocketEventSubject.next(statsEvent({ op1: sampleStats })); + + // BehaviorSubject seeds {} then emits the derived map. + const latest = emissions[emissions.length - 1]; + expect(latest["op1"]).toEqual({ + operatorId: "op1", + dataProcessingTimeNs: 5_000_000, + controlProcessingTimeNs: 1_000_000, + idleTimeNs: 700_000, + inputRows: 1_000, + outputRows: 250, + inputSize: 8_000, + outputSize: 2_000, + numWorkers: 2, + }); + expect(service.getCurrentPerformanceMetrics()).toEqual(latest); + }); + + it("seeds the performance-metrics stream with an empty map for late subscribers", () => { + expect(service.getCurrentPerformanceMetrics()).toEqual({}); + let seeded: Record | undefined; + service.getPerformanceMetricsStream().subscribe(m => (seeded = m)); + expect(seeded).toEqual({}); + }); + + it("defaults missing optional fields to 0 when deriving metrics", () => { + const partial: OperatorStatistics = { + operatorState: OperatorState.Uninitialized, + aggregatedInputRowCount: 0, + inputPortMetrics: {}, + aggregatedOutputRowCount: 0, + outputPortMetrics: {}, + }; + websocketEventSubject.next(statsEvent({ op1: partial })); + + const m = service.getCurrentPerformanceMetrics()["op1"]; + expect(m.dataProcessingTimeNs).toBe(0); + expect(m.controlProcessingTimeNs).toBe(0); + expect(m.idleTimeNs).toBe(0); + expect(m.inputSize).toBe(0); + expect(m.numWorkers).toBe(0); + }); + + it("setExternalStatus feeds both the status and performance-metrics streams", () => { + const statuses: Record[] = []; + service.getStatusUpdateStream().subscribe(s => statuses.push(s)); + + service.setExternalStatus({ historical: sampleStats }); + + expect(statuses[statuses.length - 1]).toEqual({ historical: sampleStats }); + expect(service.getCurrentStatus()).toEqual({ historical: sampleStats }); + expect(service.getCurrentPerformanceMetrics()["historical"].dataProcessingTimeNs).toBe(5_000_000); + }); + + it("resetStatus zeros the metrics for known operators", () => { + websocketEventSubject.next(statsEvent({ op1: sampleStats })); + service.resetStatus(); + + const m = service.getCurrentPerformanceMetrics()["op1"]; + expect(m).toEqual({ + operatorId: "op1", + dataProcessingTimeNs: 0, + controlProcessingTimeNs: 0, + idleTimeNs: 0, + inputRows: 0, + outputRows: 0, + inputSize: 0, + outputSize: 0, + numWorkers: 0, + }); + }); + + it("clearStatus empties both the status and performance-metrics snapshots", () => { + websocketEventSubject.next(statsEvent({ op1: sampleStats })); + service.clearStatus(); + + expect(service.getCurrentStatus()).toEqual({}); + expect(service.getCurrentPerformanceMetrics()).toEqual({}); + }); +}); diff --git a/frontend/src/app/workspace/service/workflow-status/workflow-status.service.ts b/frontend/src/app/workspace/service/workflow-status/workflow-status.service.ts index e939932aeba..5252900d244 100644 --- a/frontend/src/app/workspace/service/workflow-status/workflow-status.service.ts +++ b/frontend/src/app/workspace/service/workflow-status/workflow-status.service.ts @@ -18,9 +18,10 @@ */ import { Injectable } from "@angular/core"; -import { Observable, Subject } from "rxjs"; +import { BehaviorSubject, Observable, Subject } from "rxjs"; import { OperatorState, OperatorStatistics } from "../../types/execute-workflow.interface"; import { WorkflowWebsocketService } from "../workflow-websocket/workflow-websocket.service"; +import { OperatorPerformanceMetrics, toPerformanceMetrics } from "./performance-metrics"; @Injectable({ providedIn: "root", @@ -30,8 +31,18 @@ export class WorkflowStatusService { private statusSubject = new Subject>(); private currentStatus: Record = {}; + // Derived, ground-truth performance metrics for the heat-map overlay. Backed by + // a BehaviorSubject so a consumer that subscribes after a run already streamed + // (e.g. the overlay toggled on mid/post-run) still receives the latest value. + private performanceMetricsSubject = new BehaviorSubject>({}); + constructor(private workflowWebsocketService: WorkflowWebsocketService) { - this.getStatusUpdateStream().subscribe(event => (this.currentStatus = event)); + // Single derivation path: every status emission (websocket, reset, clear, or + // externally fed historical stats) recomputes the performance metrics. + this.getStatusUpdateStream().subscribe(status => { + this.currentStatus = status; + this.performanceMetricsSubject.next(this.buildPerformanceMetrics(status)); + }); this.workflowWebsocketService.websocketEvent().subscribe(event => { if (event.type !== "OperatorStatisticsUpdateEvent") { @@ -49,6 +60,35 @@ export class WorkflowStatusService { return this.currentStatus; } + /** Stream of derived per-operator performance metrics, keyed by operator id. */ + public getPerformanceMetricsStream(): Observable> { + return this.performanceMetricsSubject.asObservable(); + } + + /** Synchronous snapshot of the latest derived per-operator performance metrics. */ + public getCurrentPerformanceMetrics(): Record { + return this.performanceMetricsSubject.getValue(); + } + + /** + * Feed externally-sourced statistics (e.g. restored historical runtime stats) + * through the same pipeline as live websocket updates, so derived metrics and + * all subscribers update identically. + */ + public setExternalStatus(status: Record): void { + this.statusSubject.next(status); + } + + private buildPerformanceMetrics( + status: Record + ): Record { + const metrics: Record = {}; + for (const operatorId of Object.keys(status)) { + metrics[operatorId] = toPerformanceMetrics(operatorId, status[operatorId]); + } + return metrics; + } + public resetStatus(): void { const initStatus: Record = Object.keys(this.currentStatus).reduce( (accumulator, operatorId) => { diff --git a/frontend/src/app/workspace/types/execute-workflow.interface.ts b/frontend/src/app/workspace/types/execute-workflow.interface.ts index 9b6edb00b29..1fc99e7a605 100644 --- a/frontend/src/app/workspace/types/execute-workflow.interface.ts +++ b/frontend/src/app/workspace/types/execute-workflow.interface.ts @@ -82,10 +82,15 @@ export interface OperatorStatistics extends Readonly<{ operatorState: OperatorState; aggregatedInputRowCount: number; + aggregatedInputSize?: number; inputPortMetrics: Record; aggregatedOutputRowCount: number; + aggregatedOutputSize?: number; outputPortMetrics: Record; numWorkers?: number; + aggregatedDataProcessingTime?: number; + aggregatedControlProcessingTime?: number; + aggregatedIdleTime?: number; }> {} export interface OperatorStatsUpdate From 47b4dafa28e66b6a424d575697ddfd6715e77b39 Mon Sep 17 00:00:00 2001 From: PG1204 Date: Sun, 21 Jun 2026 15:40:06 -0700 Subject: [PATCH 2/3] refactor(frontend): move heat-map view scoring out of metrics capture --- .../performance-metrics.spec.ts | 135 +++--------------- .../workflow-status/performance-metrics.ts | 103 +------------ 2 files changed, 23 insertions(+), 215 deletions(-) diff --git a/frontend/src/app/workspace/service/workflow-status/performance-metrics.spec.ts b/frontend/src/app/workspace/service/workflow-status/performance-metrics.spec.ts index 21695143da3..9de496c3e17 100644 --- a/frontend/src/app/workspace/service/workflow-status/performance-metrics.spec.ts +++ b/frontend/src/app/workspace/service/workflow-status/performance-metrics.spec.ts @@ -17,13 +17,7 @@ * under the License. */ -import { - HeatmapView, - OperatorPerformanceMetrics, - normalizeScores, - rawMetricForView, - toPerformanceMetrics, -} from "./performance-metrics"; +import { OperatorPerformanceMetrics, toPerformanceMetrics } from "./performance-metrics"; import { OperatorState, OperatorStatistics } from "../../types/execute-workflow.interface"; /** @@ -57,25 +51,9 @@ const partialStats: OperatorStatistics = { outputPortMetrics: {}, }; -/** Build an OperatorPerformanceMetrics for the rawMetricForView tests. */ -function makeMetrics(overrides: Partial): OperatorPerformanceMetrics { - return { - operatorId: "op", - dataProcessingTimeNs: 0, - controlProcessingTimeNs: 0, - idleTimeNs: 0, - inputRows: 0, - outputRows: 0, - inputSize: 0, - outputSize: 0, - numWorkers: 0, - ...overrides, - }; -} - describe("toPerformanceMetrics", () => { it("maps every field from a full statistics object", () => { - const m = toPerformanceMetrics("Filter-operator-a1b2", fullStats); + const m: OperatorPerformanceMetrics = toPerformanceMetrics("Filter-operator-a1b2", fullStats); expect(m).toEqual({ operatorId: "Filter-operator-a1b2", dataProcessingTimeNs: 8_500_000_000, @@ -116,102 +94,23 @@ describe("toPerformanceMetrics", () => { } }); + it("coerces non-finite / negative inputs to 0", () => { + const malformed = { + operatorState: OperatorState.Running, + aggregatedInputRowCount: Number.POSITIVE_INFINITY, + inputPortMetrics: {}, + aggregatedOutputRowCount: -5, + outputPortMetrics: {}, + aggregatedDataProcessingTime: Number.NaN, + } as unknown as OperatorStatistics; + const m = toPerformanceMetrics("op", malformed); + expect(m.inputRows).toBe(0); + expect(m.outputRows).toBe(0); + expect(m.dataProcessingTimeNs).toBe(0); + }); + it("preserves a unicode operator id verbatim", () => { const id = "算子-✓-1"; expect(toPerformanceMetrics(id, fullStats).operatorId).toBe(id); }); }); - -describe("rawMetricForView", () => { - // Bottleneck-oriented semantics: a higher raw cost means "hotter" (more of a - // problem), before normalization. - - it("Runtime returns the data processing time (hotter = slower)", () => { - const m = makeMetrics({ dataProcessingTimeNs: 8_500_000_000 }); - expect(rawMetricForView(m, HeatmapView.Runtime)).toBe(8_500_000_000); - }); - - it("Throughput inverts output rows so slow producers are hot", () => { - const m = makeMetrics({ outputRows: 4 }); - expect(rawMetricForView(m, HeatmapView.Throughput)).toBe(0.25); - }); - - it("Throughput returns 0 when there is no output (no divide-by-zero)", () => { - const m = makeMetrics({ outputRows: 0 }); - const score = rawMetricForView(m, HeatmapView.Throughput); - expect(score).toBe(0); - expect(Number.isFinite(score)).toBe(true); - }); - - it("IoImbalance scores the row-drop rate (1 - out/in)", () => { - const m = makeMetrics({ inputRows: 1_000, outputRows: 250 }); - expect(rawMetricForView(m, HeatmapView.IoImbalance)).toBe(0.75); - }); - - it("IoImbalance returns 0 for an amplifier (out > in), clamped to [0,1]", () => { - const m = makeMetrics({ inputRows: 100, outputRows: 500 }); - expect(rawMetricForView(m, HeatmapView.IoImbalance)).toBe(0); - }); - - it("IoImbalance returns 0 when inputRows is 0 (no divide-by-zero)", () => { - const m = makeMetrics({ inputRows: 0, outputRows: 250 }); - const score = rawMetricForView(m, HeatmapView.IoImbalance); - expect(score).toBe(0); - expect(Number.isFinite(score)).toBe(true); - }); -}); - -describe("normalizeScores", () => { - it("returns an empty object for empty input", () => { - expect(normalizeScores({})).toEqual({}); - }); - - it("scores a single operator as 1 (it is the hottest)", () => { - expect(normalizeScores({ a: 42 })).toEqual({ a: 1 }); - }); - - it("scores all-equal values as 0.5 (avoids divide-by-zero)", () => { - expect(normalizeScores({ a: 5, b: 5, c: 5 })).toEqual({ a: 0.5, b: 0.5, c: 0.5 }); - }); - - it("scores all-zero values as 0.5", () => { - expect(normalizeScores({ a: 0, b: 0 })).toEqual({ a: 0.5, b: 0.5 }); - }); - - it("maps the min to 0 and the max to 1 for two distinct values", () => { - const scores = normalizeScores({ low: 1, high: 100 }); - expect(scores["low"]).toBe(0); - expect(scores["high"]).toBe(1); - }); - - it("keeps all scores within [0, 1]", () => { - const scores = normalizeScores({ a: 3, b: 50, c: 900, d: 12 }); - for (const s of Object.values(scores)) { - expect(s).toBeGreaterThanOrEqual(0); - expect(s).toBeLessThanOrEqual(1); - } - }); - - it("compresses heavy-tailed values so the middle is not flattened to ~0", () => { - // With plain linear min-max, 100 would map to (100-1)/(1000-1) ≈ 0.1. - // Log scaling lifts the middle well above 0.5, which is the point. - const scores = normalizeScores({ small: 1, mid: 100, big: 1000 }); - expect(scores["small"]).toBe(0); - expect(scores["big"]).toBe(1); - expect(scores["mid"]).toBeGreaterThan(0.5); - }); - - it("treats non-finite raw values as 0 rather than propagating NaN/Infinity", () => { - const scores = normalizeScores({ bad: Number.POSITIVE_INFINITY, worse: NaN, good: 100 }); - for (const s of Object.values(scores)) { - expect(Number.isFinite(s)).toBe(true); - } - // the only real magnitude wins the top of the scale - expect(scores["good"]).toBe(1); - }); - - it("preserves unicode operator ids as keys", () => { - const scores = normalizeScores({ "算子-✓": 10, b: 20 }); - expect(Object.keys(scores).sort()).toEqual(["b", "算子-✓"].sort()); - }); -}); diff --git a/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts b/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts index a56fd835242..e317cbf24d3 100644 --- a/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts +++ b/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts @@ -20,12 +20,12 @@ import { OperatorStatistics } from "../../types/execute-workflow.interface"; /** - * Derived, normalized-ready per-operator performance metrics. + * Derived per-operator performance metrics. * - * This is the ground-truth model consumed by the workflow heat-map overlay. It - * is a flat, defensively-defaulted projection of the raw {@link OperatorStatistics} + * This is the ground-truth model captured by {@link WorkflowStatusService}. It is + * a flat, defensively-defaulted projection of the raw {@link OperatorStatistics} * the backend streams over the websocket — every field is a finite, non-negative - * number so downstream scoring never has to re-validate. + * number, so downstream consumers never have to re-validate. */ export interface OperatorPerformanceMetrics extends Readonly<{ @@ -40,34 +40,19 @@ export interface OperatorPerformanceMetrics numWorkers: number; }> {} -/** - * The three heat-map views. Each answers a different "where is the problem?" - * question; see {@link rawMetricForView} for the per-operator cost each one uses. - * String-valued so the selection serializes readably (e.g. into localStorage). - */ -export enum HeatmapView { - Runtime = "runtime", - Throughput = "throughput", - IoImbalance = "ioImbalance", -} - /** * Coerce an untrusted numeric field (it arrives over the websocket) into a * finite, non-negative number. Anything missing, non-numeric, NaN, infinite, or - * negative collapses to 0 so no NaN/Infinity can leak into the scoring math. + * negative collapses to 0 so no NaN/Infinity can leak into downstream consumers. */ function toFiniteNonNegative(value: number | undefined): number { return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : 0; } -function clamp(value: number, min: number, max: number): number { - return Math.min(max, Math.max(min, value)); -} - /** * Project a single raw {@link OperatorStatistics} into the flat performance model, * defaulting every optional/missing field to 0. Data and control processing time - * are kept separate (the Runtime view uses data time only). + * are kept as separate fields. */ export function toPerformanceMetrics(operatorId: string, stats: OperatorStatistics): OperatorPerformanceMetrics { return { @@ -82,79 +67,3 @@ export function toPerformanceMetrics(operatorId: string, stats: OperatorStatisti numWorkers: toFiniteNonNegative(stats.numWorkers), }; } - -/** - * Per-operator raw cost for a view, BEFORE normalization. Bottleneck-oriented: - * a higher cost means "hotter" (more of a problem). - * - * - Runtime: data processing time — slower operators are hotter. - * - Throughput: 1 / outputRows — slow producers are hotter; no output -> 0 (cold). - * - IoImbalance: clamp(1 - out/in) — row-dropping operators are hotter; an - * amplifier (out > in) or a missing input clamps to 0 (cold). - * - * The metrics are already finite and non-negative (see {@link toPerformanceMetrics}), - * so this never produces NaN/Infinity. - */ -export function rawMetricForView(metrics: OperatorPerformanceMetrics, view: HeatmapView): number { - switch (view) { - case HeatmapView.Runtime: - return metrics.dataProcessingTimeNs; - case HeatmapView.Throughput: - return metrics.outputRows > 0 ? 1 / metrics.outputRows : 0; - case HeatmapView.IoImbalance: - return metrics.inputRows <= 0 ? 0 : clamp(1 - metrics.outputRows / metrics.inputRows, 0, 1); - default: - return 0; - } -} - -/** - * Normalize per-operator raw costs into [0, 1] heat scores. - * - * Uses log1p compression then min-max across operators, so a single dominant - * operator does not flatten everyone else toward 0. Rules: - * - empty input -> {} - * - single operator -> 1 if it did measurable work, else 0.5 (neutral) - * - all values equal -> 0.5 for everyone (no spread to show; avoids /0) - * - otherwise -> min maps to 0, max maps to 1, rest interpolated - * - * Non-finite / negative raw costs are treated as 0 before scoring. - */ -export function normalizeScores(rawById: Record): Record { - const operatorIds = Object.keys(rawById); - if (operatorIds.length === 0) { - return {}; - } - - // Defensive coercion + log1p compression in one pass. - const compressed: Record = {}; - for (const operatorId of operatorIds) { - compressed[operatorId] = Math.log1p(toFiniteNonNegative(rawById[operatorId])); - } - - if (operatorIds.length === 1) { - const onlyId = operatorIds[0]; - return { [onlyId]: compressed[onlyId] > 0 ? 1 : 0.5 }; - } - - const values = Object.values(compressed); - const min = Math.min(...values); - const max = Math.max(...values); - - // All operators have the same cost (covers the all-zero case): no spread to - // render, so paint everyone neutral rather than dividing by zero. - if (max === min) { - const neutral: Record = {}; - for (const operatorId of operatorIds) { - neutral[operatorId] = 0.5; - } - return neutral; - } - - const range = max - min; - const scores: Record = {}; - for (const operatorId of operatorIds) { - scores[operatorId] = clamp((compressed[operatorId] - min) / range, 0, 1); - } - return scores; -} From 9c0204b1a0630aae6f7ba9719d6ad4fbb6710ac4 Mon Sep 17 00:00:00 2001 From: PG1204 Date: Sun, 21 Jun 2026 15:46:40 -0700 Subject: [PATCH 3/3] refactor(frontend): scope metrics capture to ground truth --- .../workspace/service/workflow-status/performance-metrics.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts b/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts index e317cbf24d3..825429566ab 100644 --- a/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts +++ b/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts @@ -52,7 +52,7 @@ function toFiniteNonNegative(value: number | undefined): number { /** * Project a single raw {@link OperatorStatistics} into the flat performance model, * defaulting every optional/missing field to 0. Data and control processing time - * are kept as separate fields. + * are kept as separate fields so consumers can choose how to combine them. */ export function toPerformanceMetrics(operatorId: string, stats: OperatorStatistics): OperatorPerformanceMetrics { return {