Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { OperatorPerformanceMetrics, toPerformanceMetrics } from "./performance-metrics";
import { OperatorState, OperatorStatistics } from "../../types/execute-workflow.interface";

/**
* A complete statistics object, mirroring what the backend sends once every
* field is typed. All five timing/size fields are present.
*/
const fullStats: OperatorStatistics = {
operatorState: OperatorState.Running,
aggregatedInputRowCount: 1_000_000,
aggregatedInputSize: 84_000_000,
inputPortMetrics: { "0": 1_000_000 },
aggregatedOutputRowCount: 12_000,
aggregatedOutputSize: 1_010_000,
outputPortMetrics: { "0": 12_000 },
numWorkers: 4,
aggregatedDataProcessingTime: 8_500_000_000,
aggregatedControlProcessingTime: 120_000_000,
aggregatedIdleTime: 300_000_000,
};

/**
* The partial shape produced by WorkflowStatusService.resetStatus(): only the
* required fields, none of the five optional timing/size fields, no numWorkers.
* The mapper must survive this without emitting NaN/undefined.
*/
const partialStats: OperatorStatistics = {
operatorState: OperatorState.Uninitialized,
aggregatedInputRowCount: 0,
inputPortMetrics: {},
aggregatedOutputRowCount: 0,
outputPortMetrics: {},
};

describe("toPerformanceMetrics", () => {
it("maps every field from a full statistics object", () => {
const m: OperatorPerformanceMetrics = toPerformanceMetrics("Filter-operator-a1b2", fullStats);
expect(m).toEqual({
operatorId: "Filter-operator-a1b2",
dataProcessingTimeNs: 8_500_000_000,
controlProcessingTimeNs: 120_000_000,
idleTimeNs: 300_000_000,
inputRows: 1_000_000,
outputRows: 12_000,
inputSize: 84_000_000,
outputSize: 1_010_000,
numWorkers: 4,
});
});

it("keeps data and control processing time as separate fields", () => {
const m = toPerformanceMetrics("op", fullStats);
expect(m.dataProcessingTimeNs).toBe(fullStats.aggregatedDataProcessingTime);
expect(m.controlProcessingTimeNs).toBe(fullStats.aggregatedControlProcessingTime);
});

it("defaults every optional/missing field to 0 (no NaN, no undefined)", () => {
const m = toPerformanceMetrics("op", partialStats);
expect(m).toEqual({
operatorId: "op",
dataProcessingTimeNs: 0,
controlProcessingTimeNs: 0,
idleTimeNs: 0,
inputRows: 0,
outputRows: 0,
inputSize: 0,
outputSize: 0,
numWorkers: 0,
});
// explicit guard: nothing leaked through as NaN
for (const value of Object.values(m)) {
if (typeof value === "number") {
expect(Number.isNaN(value)).toBe(false);
}
}
});

it("coerces non-finite / negative inputs to 0", () => {
const malformed = {
operatorState: OperatorState.Running,
aggregatedInputRowCount: Number.POSITIVE_INFINITY,
inputPortMetrics: {},
aggregatedOutputRowCount: -5,
outputPortMetrics: {},
aggregatedDataProcessingTime: Number.NaN,
} as unknown as OperatorStatistics;
const m = toPerformanceMetrics("op", malformed);
expect(m.inputRows).toBe(0);
expect(m.outputRows).toBe(0);
expect(m.dataProcessingTimeNs).toBe(0);
});

it("preserves a unicode operator id verbatim", () => {
const id = "算子-✓-1";
expect(toPerformanceMetrics(id, fullStats).operatorId).toBe(id);
});
});
Comment thread
Yicong-Huang marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { OperatorStatistics } from "../../types/execute-workflow.interface";

/**
* Derived per-operator performance metrics.
*
* This is the ground-truth model captured by {@link WorkflowStatusService}. It is
* a flat, defensively-defaulted projection of the raw {@link OperatorStatistics}
* the backend streams over the websocket — every field is a finite, non-negative
* number, so downstream consumers never have to re-validate.
*/
export interface OperatorPerformanceMetrics
extends Readonly<{
operatorId: string;
dataProcessingTimeNs: number;
controlProcessingTimeNs: number;
idleTimeNs: number;
inputRows: number;
outputRows: number;
inputSize: number;
outputSize: number;
numWorkers: number;
}> {}

/**
* Coerce an untrusted numeric field (it arrives over the websocket) into a
* finite, non-negative number. Anything missing, non-numeric, NaN, infinite, or
* negative collapses to 0 so no NaN/Infinity can leak into downstream consumers.
*/
function toFiniteNonNegative(value: number | undefined): number {
return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : 0;
}

/**
* Project a single raw {@link OperatorStatistics} into the flat performance model,
* defaulting every optional/missing field to 0. Data and control processing time
* are kept as separate fields so consumers can choose how to combine them.
*/
Comment thread
PG1204 marked this conversation as resolved.
export function toPerformanceMetrics(operatorId: string, stats: OperatorStatistics): OperatorPerformanceMetrics {
return {
operatorId,
dataProcessingTimeNs: toFiniteNonNegative(stats.aggregatedDataProcessingTime),
controlProcessingTimeNs: toFiniteNonNegative(stats.aggregatedControlProcessingTime),
idleTimeNs: toFiniteNonNegative(stats.aggregatedIdleTime),
inputRows: toFiniteNonNegative(stats.aggregatedInputRowCount),
outputRows: toFiniteNonNegative(stats.aggregatedOutputRowCount),
inputSize: toFiniteNonNegative(stats.aggregatedInputSize),
outputSize: toFiniteNonNegative(stats.aggregatedOutputSize),
numWorkers: toFiniteNonNegative(stats.numWorkers),
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { TestBed } from "@angular/core/testing";
import { Subject } from "rxjs";
import { WorkflowStatusService } from "./workflow-status.service";
import { WorkflowWebsocketService } from "../workflow-websocket/workflow-websocket.service";
import { OperatorPerformanceMetrics } from "./performance-metrics";
import { OperatorState, OperatorStatistics } from "../../types/execute-workflow.interface";
import { TexeraWebsocketEvent } from "../../types/workflow-websocket.interface";

const sampleStats: OperatorStatistics = {
operatorState: OperatorState.Running,
aggregatedInputRowCount: 1_000,
aggregatedInputSize: 8_000,
inputPortMetrics: { "0": 1_000 },
aggregatedOutputRowCount: 250,
aggregatedOutputSize: 2_000,
outputPortMetrics: { "0": 250 },
numWorkers: 2,
aggregatedDataProcessingTime: 5_000_000,
aggregatedControlProcessingTime: 1_000_000,
aggregatedIdleTime: 700_000,
};

function statsEvent(operatorStatistics: Record<string, OperatorStatistics>): TexeraWebsocketEvent {
return { type: "OperatorStatisticsUpdateEvent", operatorStatistics } as TexeraWebsocketEvent;
}

describe("WorkflowStatusService", () => {
let service: WorkflowStatusService;
let websocketEventSubject: Subject<TexeraWebsocketEvent>;

beforeEach(() => {
websocketEventSubject = new Subject<TexeraWebsocketEvent>();
const websocketStub: Partial<WorkflowWebsocketService> = {
websocketEvent: () => websocketEventSubject.asObservable(),
};
TestBed.configureTestingModule({
providers: [WorkflowStatusService, { provide: WorkflowWebsocketService, useValue: websocketStub }],
});
service = TestBed.inject(WorkflowStatusService);
});

it("forwards an OperatorStatisticsUpdateEvent to the status stream", () => {
const received: Record<string, OperatorStatistics>[] = [];
service.getStatusUpdateStream().subscribe(s => received.push(s));

websocketEventSubject.next(statsEvent({ op1: sampleStats }));

expect(received).toHaveLength(1);
expect(received[0]).toEqual({ op1: sampleStats });
expect(service.getCurrentStatus()).toEqual({ op1: sampleStats });
});

it("ignores websocket events of other types", () => {
const received: Record<string, OperatorStatistics>[] = [];
service.getStatusUpdateStream().subscribe(s => received.push(s));

websocketEventSubject.next({ type: "WorkflowErrorEvent" } as unknown as TexeraWebsocketEvent);

expect(received).toHaveLength(0);
expect(service.getCurrentStatus()).toEqual({});
});

it("derives performance metrics from a status update", () => {
const emissions: Record<string, OperatorPerformanceMetrics>[] = [];
service.getPerformanceMetricsStream().subscribe(m => emissions.push(m));

websocketEventSubject.next(statsEvent({ op1: sampleStats }));

// BehaviorSubject seeds {} then emits the derived map.
const latest = emissions[emissions.length - 1];
expect(latest["op1"]).toEqual({
operatorId: "op1",
dataProcessingTimeNs: 5_000_000,
controlProcessingTimeNs: 1_000_000,
idleTimeNs: 700_000,
inputRows: 1_000,
outputRows: 250,
inputSize: 8_000,
outputSize: 2_000,
numWorkers: 2,
});
expect(service.getCurrentPerformanceMetrics()).toEqual(latest);
});

it("seeds the performance-metrics stream with an empty map for late subscribers", () => {
expect(service.getCurrentPerformanceMetrics()).toEqual({});
let seeded: Record<string, OperatorPerformanceMetrics> | undefined;
service.getPerformanceMetricsStream().subscribe(m => (seeded = m));
expect(seeded).toEqual({});
});

it("defaults missing optional fields to 0 when deriving metrics", () => {
const partial: OperatorStatistics = {
operatorState: OperatorState.Uninitialized,
aggregatedInputRowCount: 0,
inputPortMetrics: {},
aggregatedOutputRowCount: 0,
outputPortMetrics: {},
};
websocketEventSubject.next(statsEvent({ op1: partial }));

const m = service.getCurrentPerformanceMetrics()["op1"];
expect(m.dataProcessingTimeNs).toBe(0);
expect(m.controlProcessingTimeNs).toBe(0);
expect(m.idleTimeNs).toBe(0);
expect(m.inputSize).toBe(0);
expect(m.numWorkers).toBe(0);
});

it("setExternalStatus feeds both the status and performance-metrics streams", () => {
const statuses: Record<string, OperatorStatistics>[] = [];
service.getStatusUpdateStream().subscribe(s => statuses.push(s));

service.setExternalStatus({ historical: sampleStats });

expect(statuses[statuses.length - 1]).toEqual({ historical: sampleStats });
expect(service.getCurrentStatus()).toEqual({ historical: sampleStats });
expect(service.getCurrentPerformanceMetrics()["historical"].dataProcessingTimeNs).toBe(5_000_000);
});

it("resetStatus zeros the metrics for known operators", () => {
websocketEventSubject.next(statsEvent({ op1: sampleStats }));
service.resetStatus();

const m = service.getCurrentPerformanceMetrics()["op1"];
expect(m).toEqual({
operatorId: "op1",
dataProcessingTimeNs: 0,
controlProcessingTimeNs: 0,
idleTimeNs: 0,
inputRows: 0,
outputRows: 0,
inputSize: 0,
outputSize: 0,
numWorkers: 0,
});
});

it("clearStatus empties both the status and performance-metrics snapshots", () => {
websocketEventSubject.next(statsEvent({ op1: sampleStats }));
service.clearStatus();

expect(service.getCurrentStatus()).toEqual({});
expect(service.getCurrentPerformanceMetrics()).toEqual({});
});
});
Loading
Loading