diff --git a/frontend/src/app/workspace/service/workflow-status/performance-metrics.spec.ts b/frontend/src/app/workspace/service/workflow-status/performance-metrics.spec.ts new file mode 100644 index 00000000000..9de496c3e17 --- /dev/null +++ b/frontend/src/app/workspace/service/workflow-status/performance-metrics.spec.ts @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { OperatorPerformanceMetrics, toPerformanceMetrics } from "./performance-metrics"; +import { OperatorState, OperatorStatistics } from "../../types/execute-workflow.interface"; + +/** + * A complete statistics object, mirroring what the backend sends once every + * field is typed. All five timing/size fields are present. + */ +const fullStats: OperatorStatistics = { + operatorState: OperatorState.Running, + aggregatedInputRowCount: 1_000_000, + aggregatedInputSize: 84_000_000, + inputPortMetrics: { "0": 1_000_000 }, + aggregatedOutputRowCount: 12_000, + aggregatedOutputSize: 1_010_000, + outputPortMetrics: { "0": 12_000 }, + numWorkers: 4, + aggregatedDataProcessingTime: 8_500_000_000, + aggregatedControlProcessingTime: 120_000_000, + aggregatedIdleTime: 300_000_000, +}; + +/** + * The partial shape produced by WorkflowStatusService.resetStatus(): only the + * required fields, none of the five optional timing/size fields, no numWorkers. + * The mapper must survive this without emitting NaN/undefined. + */ +const partialStats: OperatorStatistics = { + operatorState: OperatorState.Uninitialized, + aggregatedInputRowCount: 0, + inputPortMetrics: {}, + aggregatedOutputRowCount: 0, + outputPortMetrics: {}, +}; + +describe("toPerformanceMetrics", () => { + it("maps every field from a full statistics object", () => { + const m: OperatorPerformanceMetrics = toPerformanceMetrics("Filter-operator-a1b2", fullStats); + expect(m).toEqual({ + operatorId: "Filter-operator-a1b2", + dataProcessingTimeNs: 8_500_000_000, + controlProcessingTimeNs: 120_000_000, + idleTimeNs: 300_000_000, + inputRows: 1_000_000, + outputRows: 12_000, + inputSize: 84_000_000, + outputSize: 1_010_000, + numWorkers: 4, + }); + }); + + it("keeps data and control processing time as separate fields", () => { + const m = toPerformanceMetrics("op", fullStats); + expect(m.dataProcessingTimeNs).toBe(fullStats.aggregatedDataProcessingTime); + expect(m.controlProcessingTimeNs).toBe(fullStats.aggregatedControlProcessingTime); + }); + + it("defaults every optional/missing field to 0 (no NaN, no undefined)", () => { + const m = toPerformanceMetrics("op", partialStats); + expect(m).toEqual({ + operatorId: "op", + dataProcessingTimeNs: 0, + controlProcessingTimeNs: 0, + idleTimeNs: 0, + inputRows: 0, + outputRows: 0, + inputSize: 0, + outputSize: 0, + numWorkers: 0, + }); + // explicit guard: nothing leaked through as NaN + for (const value of Object.values(m)) { + if (typeof value === "number") { + expect(Number.isNaN(value)).toBe(false); + } + } + }); + + it("coerces non-finite / negative inputs to 0", () => { + const malformed = { + operatorState: OperatorState.Running, + aggregatedInputRowCount: Number.POSITIVE_INFINITY, + inputPortMetrics: {}, + aggregatedOutputRowCount: -5, + outputPortMetrics: {}, + aggregatedDataProcessingTime: Number.NaN, + } as unknown as OperatorStatistics; + const m = toPerformanceMetrics("op", malformed); + expect(m.inputRows).toBe(0); + expect(m.outputRows).toBe(0); + expect(m.dataProcessingTimeNs).toBe(0); + }); + + it("preserves a unicode operator id verbatim", () => { + const id = "įŽ—å­-✓-1"; + expect(toPerformanceMetrics(id, fullStats).operatorId).toBe(id); + }); +}); diff --git a/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts b/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts new file mode 100644 index 00000000000..825429566ab --- /dev/null +++ b/frontend/src/app/workspace/service/workflow-status/performance-metrics.ts @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { OperatorStatistics } from "../../types/execute-workflow.interface"; + +/** + * Derived per-operator performance metrics. + * + * This is the ground-truth model captured by {@link WorkflowStatusService}. It is + * a flat, defensively-defaulted projection of the raw {@link OperatorStatistics} + * the backend streams over the websocket — every field is a finite, non-negative + * number, so downstream consumers never have to re-validate. + */ +export interface OperatorPerformanceMetrics + extends Readonly<{ + operatorId: string; + dataProcessingTimeNs: number; + controlProcessingTimeNs: number; + idleTimeNs: number; + inputRows: number; + outputRows: number; + inputSize: number; + outputSize: number; + numWorkers: number; + }> {} + +/** + * Coerce an untrusted numeric field (it arrives over the websocket) into a + * finite, non-negative number. Anything missing, non-numeric, NaN, infinite, or + * negative collapses to 0 so no NaN/Infinity can leak into downstream consumers. + */ +function toFiniteNonNegative(value: number | undefined): number { + return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : 0; +} + +/** + * Project a single raw {@link OperatorStatistics} into the flat performance model, + * defaulting every optional/missing field to 0. Data and control processing time + * are kept as separate fields so consumers can choose how to combine them. + */ +export function toPerformanceMetrics(operatorId: string, stats: OperatorStatistics): OperatorPerformanceMetrics { + return { + operatorId, + dataProcessingTimeNs: toFiniteNonNegative(stats.aggregatedDataProcessingTime), + controlProcessingTimeNs: toFiniteNonNegative(stats.aggregatedControlProcessingTime), + idleTimeNs: toFiniteNonNegative(stats.aggregatedIdleTime), + inputRows: toFiniteNonNegative(stats.aggregatedInputRowCount), + outputRows: toFiniteNonNegative(stats.aggregatedOutputRowCount), + inputSize: toFiniteNonNegative(stats.aggregatedInputSize), + outputSize: toFiniteNonNegative(stats.aggregatedOutputSize), + numWorkers: toFiniteNonNegative(stats.numWorkers), + }; +} diff --git a/frontend/src/app/workspace/service/workflow-status/workflow-status.service.spec.ts b/frontend/src/app/workspace/service/workflow-status/workflow-status.service.spec.ts new file mode 100644 index 00000000000..fb1614c9e0a --- /dev/null +++ b/frontend/src/app/workspace/service/workflow-status/workflow-status.service.spec.ts @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { TestBed } from "@angular/core/testing"; +import { Subject } from "rxjs"; +import { WorkflowStatusService } from "./workflow-status.service"; +import { WorkflowWebsocketService } from "../workflow-websocket/workflow-websocket.service"; +import { OperatorPerformanceMetrics } from "./performance-metrics"; +import { OperatorState, OperatorStatistics } from "../../types/execute-workflow.interface"; +import { TexeraWebsocketEvent } from "../../types/workflow-websocket.interface"; + +const sampleStats: OperatorStatistics = { + operatorState: OperatorState.Running, + aggregatedInputRowCount: 1_000, + aggregatedInputSize: 8_000, + inputPortMetrics: { "0": 1_000 }, + aggregatedOutputRowCount: 250, + aggregatedOutputSize: 2_000, + outputPortMetrics: { "0": 250 }, + numWorkers: 2, + aggregatedDataProcessingTime: 5_000_000, + aggregatedControlProcessingTime: 1_000_000, + aggregatedIdleTime: 700_000, +}; + +function statsEvent(operatorStatistics: Record): TexeraWebsocketEvent { + return { type: "OperatorStatisticsUpdateEvent", operatorStatistics } as TexeraWebsocketEvent; +} + +describe("WorkflowStatusService", () => { + let service: WorkflowStatusService; + let websocketEventSubject: Subject; + + beforeEach(() => { + websocketEventSubject = new Subject(); + const websocketStub: Partial = { + websocketEvent: () => websocketEventSubject.asObservable(), + }; + TestBed.configureTestingModule({ + providers: [WorkflowStatusService, { provide: WorkflowWebsocketService, useValue: websocketStub }], + }); + service = TestBed.inject(WorkflowStatusService); + }); + + it("forwards an OperatorStatisticsUpdateEvent to the status stream", () => { + const received: Record[] = []; + service.getStatusUpdateStream().subscribe(s => received.push(s)); + + websocketEventSubject.next(statsEvent({ op1: sampleStats })); + + expect(received).toHaveLength(1); + expect(received[0]).toEqual({ op1: sampleStats }); + expect(service.getCurrentStatus()).toEqual({ op1: sampleStats }); + }); + + it("ignores websocket events of other types", () => { + const received: Record[] = []; + service.getStatusUpdateStream().subscribe(s => received.push(s)); + + websocketEventSubject.next({ type: "WorkflowErrorEvent" } as unknown as TexeraWebsocketEvent); + + expect(received).toHaveLength(0); + expect(service.getCurrentStatus()).toEqual({}); + }); + + it("derives performance metrics from a status update", () => { + const emissions: Record[] = []; + service.getPerformanceMetricsStream().subscribe(m => emissions.push(m)); + + websocketEventSubject.next(statsEvent({ op1: sampleStats })); + + // BehaviorSubject seeds {} then emits the derived map. + const latest = emissions[emissions.length - 1]; + expect(latest["op1"]).toEqual({ + operatorId: "op1", + dataProcessingTimeNs: 5_000_000, + controlProcessingTimeNs: 1_000_000, + idleTimeNs: 700_000, + inputRows: 1_000, + outputRows: 250, + inputSize: 8_000, + outputSize: 2_000, + numWorkers: 2, + }); + expect(service.getCurrentPerformanceMetrics()).toEqual(latest); + }); + + it("seeds the performance-metrics stream with an empty map for late subscribers", () => { + expect(service.getCurrentPerformanceMetrics()).toEqual({}); + let seeded: Record | undefined; + service.getPerformanceMetricsStream().subscribe(m => (seeded = m)); + expect(seeded).toEqual({}); + }); + + it("defaults missing optional fields to 0 when deriving metrics", () => { + const partial: OperatorStatistics = { + operatorState: OperatorState.Uninitialized, + aggregatedInputRowCount: 0, + inputPortMetrics: {}, + aggregatedOutputRowCount: 0, + outputPortMetrics: {}, + }; + websocketEventSubject.next(statsEvent({ op1: partial })); + + const m = service.getCurrentPerformanceMetrics()["op1"]; + expect(m.dataProcessingTimeNs).toBe(0); + expect(m.controlProcessingTimeNs).toBe(0); + expect(m.idleTimeNs).toBe(0); + expect(m.inputSize).toBe(0); + expect(m.numWorkers).toBe(0); + }); + + it("setExternalStatus feeds both the status and performance-metrics streams", () => { + const statuses: Record[] = []; + service.getStatusUpdateStream().subscribe(s => statuses.push(s)); + + service.setExternalStatus({ historical: sampleStats }); + + expect(statuses[statuses.length - 1]).toEqual({ historical: sampleStats }); + expect(service.getCurrentStatus()).toEqual({ historical: sampleStats }); + expect(service.getCurrentPerformanceMetrics()["historical"].dataProcessingTimeNs).toBe(5_000_000); + }); + + it("resetStatus zeros the metrics for known operators", () => { + websocketEventSubject.next(statsEvent({ op1: sampleStats })); + service.resetStatus(); + + const m = service.getCurrentPerformanceMetrics()["op1"]; + expect(m).toEqual({ + operatorId: "op1", + dataProcessingTimeNs: 0, + controlProcessingTimeNs: 0, + idleTimeNs: 0, + inputRows: 0, + outputRows: 0, + inputSize: 0, + outputSize: 0, + numWorkers: 0, + }); + }); + + it("clearStatus empties both the status and performance-metrics snapshots", () => { + websocketEventSubject.next(statsEvent({ op1: sampleStats })); + service.clearStatus(); + + expect(service.getCurrentStatus()).toEqual({}); + expect(service.getCurrentPerformanceMetrics()).toEqual({}); + }); +}); diff --git a/frontend/src/app/workspace/service/workflow-status/workflow-status.service.ts b/frontend/src/app/workspace/service/workflow-status/workflow-status.service.ts index e939932aeba..5252900d244 100644 --- a/frontend/src/app/workspace/service/workflow-status/workflow-status.service.ts +++ b/frontend/src/app/workspace/service/workflow-status/workflow-status.service.ts @@ -18,9 +18,10 @@ */ import { Injectable } from "@angular/core"; -import { Observable, Subject } from "rxjs"; +import { BehaviorSubject, Observable, Subject } from "rxjs"; import { OperatorState, OperatorStatistics } from "../../types/execute-workflow.interface"; import { WorkflowWebsocketService } from "../workflow-websocket/workflow-websocket.service"; +import { OperatorPerformanceMetrics, toPerformanceMetrics } from "./performance-metrics"; @Injectable({ providedIn: "root", @@ -30,8 +31,18 @@ export class WorkflowStatusService { private statusSubject = new Subject>(); private currentStatus: Record = {}; + // Derived, ground-truth performance metrics for the heat-map overlay. Backed by + // a BehaviorSubject so a consumer that subscribes after a run already streamed + // (e.g. the overlay toggled on mid/post-run) still receives the latest value. + private performanceMetricsSubject = new BehaviorSubject>({}); + constructor(private workflowWebsocketService: WorkflowWebsocketService) { - this.getStatusUpdateStream().subscribe(event => (this.currentStatus = event)); + // Single derivation path: every status emission (websocket, reset, clear, or + // externally fed historical stats) recomputes the performance metrics. + this.getStatusUpdateStream().subscribe(status => { + this.currentStatus = status; + this.performanceMetricsSubject.next(this.buildPerformanceMetrics(status)); + }); this.workflowWebsocketService.websocketEvent().subscribe(event => { if (event.type !== "OperatorStatisticsUpdateEvent") { @@ -49,6 +60,35 @@ export class WorkflowStatusService { return this.currentStatus; } + /** Stream of derived per-operator performance metrics, keyed by operator id. */ + public getPerformanceMetricsStream(): Observable> { + return this.performanceMetricsSubject.asObservable(); + } + + /** Synchronous snapshot of the latest derived per-operator performance metrics. */ + public getCurrentPerformanceMetrics(): Record { + return this.performanceMetricsSubject.getValue(); + } + + /** + * Feed externally-sourced statistics (e.g. restored historical runtime stats) + * through the same pipeline as live websocket updates, so derived metrics and + * all subscribers update identically. + */ + public setExternalStatus(status: Record): void { + this.statusSubject.next(status); + } + + private buildPerformanceMetrics( + status: Record + ): Record { + const metrics: Record = {}; + for (const operatorId of Object.keys(status)) { + metrics[operatorId] = toPerformanceMetrics(operatorId, status[operatorId]); + } + return metrics; + } + public resetStatus(): void { const initStatus: Record = Object.keys(this.currentStatus).reduce( (accumulator, operatorId) => { diff --git a/frontend/src/app/workspace/types/execute-workflow.interface.ts b/frontend/src/app/workspace/types/execute-workflow.interface.ts index 9b6edb00b29..1fc99e7a605 100644 --- a/frontend/src/app/workspace/types/execute-workflow.interface.ts +++ b/frontend/src/app/workspace/types/execute-workflow.interface.ts @@ -82,10 +82,15 @@ export interface OperatorStatistics extends Readonly<{ operatorState: OperatorState; aggregatedInputRowCount: number; + aggregatedInputSize?: number; inputPortMetrics: Record; aggregatedOutputRowCount: number; + aggregatedOutputSize?: number; outputPortMetrics: Record; numWorkers?: number; + aggregatedDataProcessingTime?: number; + aggregatedControlProcessingTime?: number; + aggregatedIdleTime?: number; }> {} export interface OperatorStatsUpdate