Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions agent-service/src/agent/texera-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import {
type ExecutionConfig,
} from "./tools/workflow-execution-tools";
import { assembleContext } from "./util/context-utils";
import { compileWorkflowAsync, type WorkflowCompilationResponse } from "../api/compile-api";
import { compileWorkflowAsync } from "../api/compile-api";
import type { WorkflowCompilationResponse } from "../types/dto";
import { createLogger } from "../logger";
import type { Logger } from "pino";

Expand Down Expand Up @@ -570,15 +571,18 @@ export class TexeraAgent {
onStepFinish: async ({ text, toolCalls, toolResults, usage }) => {
stepIndex++;

// The AI SDK types tc.input / tr.output as `unknown` for dynamically
// registered tools; narrow to the shapes our tools actually produce
// (object args, string results — see tools/*).
const formattedToolCalls = toolCalls?.map(tc => ({
toolName: tc.toolName,
toolCallId: tc.toolCallId,
input: tc.input,
input: tc.input as Record<string, unknown>,
}));

const formattedToolResults = toolResults?.map(tr => ({
toolCallId: tr.toolCallId,
output: tr.output,
output: tr.output as string,
isError: !!(tr.output as any)?.error,
}));

Expand Down
2 changes: 1 addition & 1 deletion agent-service/src/agent/util/context-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import type { ModelMessage } from "ai";
import type { WorkflowState } from "../workflow-state";
import type { OperatorPredicate, OperatorPortSchemaMap, PortSchema } from "../../types/workflow";
import type { ReActStep } from "../../types/agent";
import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../api/compile-api";
import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../types/dto";
import { extractOperatorInputPortSchemaMap } from "./workflow-utils";
import { createLogger } from "../../logger";

Expand Down
165 changes: 165 additions & 0 deletions agent-service/src/agent/util/workflow-system-metadata.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { describe, expect, test } from "bun:test";
import {
WorkflowSystemMetadata,
formatValidationErrors,
formatCompactSchemaForError,
} from "./workflow-system-metadata";
import type { OperatorMetadata, OperatorSchema } from "../../types/metadata";

function operator(overrides: Partial<OperatorSchema> & Pick<OperatorSchema, "operatorType">): OperatorSchema {
return {
operatorVersion: "1",
jsonSchema: {},
additionalMetadata: {
userFriendlyName: overrides.operatorType,
operatorGroupName: "Test",
inputPorts: [],
outputPorts: [],
},
...overrides,
};
}

function metadataWith(...operators: OperatorSchema[]): OperatorMetadata {
return { operators, groups: [] };
}

// Filter exercises ref-inlining + key filtering; Limit is a clean Ajv-validatable schema.
const FILTER = operator({
operatorType: "Filter",
additionalMetadata: {
userFriendlyName: "Filter",
operatorGroupName: "Filter",
inputPorts: [],
outputPorts: [],
operatorDescription: "Filters rows",
},
jsonSchema: {
properties: {
attribute: { $ref: "#/definitions/AttributeName" },
limit: { type: "integer", propertyOrder: 5 },
dummyPropertyList: { type: "array" },
},
definitions: {
AttributeName: { type: "string", title: "Attribute" },
PortDescription: { type: "object" },
},
required: ["attribute"],
},
});

const LIMIT = operator({
operatorType: "Limit",
jsonSchema: { type: "object", properties: { limit: { type: "integer" } }, required: ["limit"] },
});

function loaded(...operators: OperatorSchema[]): WorkflowSystemMetadata {
const meta = new WorkflowSystemMetadata();
meta.loadFromMetadata(metadataWith(...operators));
return meta;
}

describe("WorkflowSystemMetadata.loadFromMetadata", () => {
test("indexes operators by type", () => {
const meta = loaded(FILTER, LIMIT);
expect(meta.getOperatorCount()).toBe(2);
expect(meta.operatorTypeExists("Filter")).toBe(true);
expect(meta.operatorTypeExists("Nope")).toBe(false);
expect(meta.getSchema("Filter")).toEqual(FILTER.jsonSchema);
expect(meta.getAllOperatorTypes()).toEqual({ Filter: "Filters rows", Limit: "Limit" });
});

test("getDescription falls back to userFriendlyName when no description", () => {
const meta = loaded(FILTER, LIMIT);
expect(meta.getDescription("Filter")).toBe("Filters rows");
expect(meta.getDescription("Limit")).toBe("Limit");
expect(meta.getDescription("Unknown")).toBe("");
});
});

describe("WorkflowSystemMetadata.getCompactSchema", () => {
test("returns null for an unknown operator type", () => {
expect(loaded(FILTER).getCompactSchema("Nope")).toBeNull();
});

test("inlines $refs, strips noise keys, and drops filtered properties", () => {
const compact = loaded(FILTER).getCompactSchema("Filter");
expect(compact).not.toBeNull();
// $ref resolved to the AttributeName definition.
expect(compact!.properties.attribute).toEqual({ type: "string", title: "Attribute" });
// propertyOrder is in COMPACT_SCHEMA_EXCLUDED_KEYS and is stripped.
expect(compact!.properties.limit).toEqual({ type: "integer" });
// dummyPropertyList is in FILTERED_PROPERTY_KEYS and is removed.
expect(compact!.properties).not.toHaveProperty("dummyPropertyList");
expect(compact!.required).toEqual(["attribute"]);
});
});

describe("WorkflowSystemMetadata.getAllSchemasAsJson", () => {
test("emits filtered properties and definitions as JSON", () => {
const parsed = JSON.parse(loaded(FILTER).getAllSchemasAsJson());
expect(Object.keys(parsed.Filter.properties)).toEqual(["attribute", "limit"]); // dummyPropertyList filtered
expect(parsed.Filter.definitions).toHaveProperty("AttributeName");
expect(parsed.Filter.definitions).not.toHaveProperty("PortDescription"); // filtered definition
expect(parsed.Filter.required).toEqual(["attribute"]);
});
});

describe("WorkflowSystemMetadata.validateOperatorProperties", () => {
test("accepts properties that satisfy the schema", () => {
expect(loaded(LIMIT).validateOperatorProperties("Limit", { limit: 5 })).toEqual({ isValid: true });
});

test("reports the missing required property", () => {
const result = loaded(LIMIT).validateOperatorProperties("Limit", {});
expect(result.isValid).toBe(false);
expect(result.isValid ? {} : result.messages).toHaveProperty("limit");
});

test("rejects an unknown operator type", () => {
const result = loaded(LIMIT).validateOperatorProperties("Nope", {});
expect(result.isValid).toBe(false);
expect(result.isValid ? "" : result.messages.error).toContain("Unknown operator type");
});
});

describe("formatValidationErrors", () => {
test("returns empty string when valid", () => {
expect(formatValidationErrors({ isValid: true })).toBe("");
});

test("joins messages as 'key: msg'", () => {
expect(formatValidationErrors({ isValid: false, messages: { limit: "is required", attribute: "bad" } })).toBe(
"limit: is required; attribute: bad"
);
});
});

describe("formatCompactSchemaForError", () => {
test("renders only the required properties", () => {
const formatted = formatCompactSchemaForError({
properties: { a: { type: "string" }, b: { type: "integer" } },
required: ["a"],
});
expect(formatted).toBe('required: [a], properties: {"a":{"type":"string"}}');
});
});
12 changes: 1 addition & 11 deletions agent-service/src/agent/util/workflow-system-metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,13 @@
import Ajv from "ajv";
import { fetchOperatorMetadata, type OperatorSchema, type OperatorMetadata } from "../../api/backend-api";
import type { ValidationError, Validation } from "../../types/workflow";
import type { OperatorSchemaInfo, CompactOperatorSchema } from "../../types/metadata";
import { createLogger } from "../../logger";

const log = createLogger("WorkflowSystemMetadata");

export type { ValidationError, Validation } from "../../types/workflow";

interface OperatorSchemaInfo {
properties: any;
required: any;
definitions: any;
}

interface CompactOperatorSchema {
properties: Record<string, any>;
required: string[];
}

const FILTERED_PROPERTY_KEYS = ["dummyPropertyList"];

const FILTERED_DEFINITION_KEYS = [
Expand Down
49 changes: 10 additions & 39 deletions agent-service/src/api/backend-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
*/

import { env } from "../config/env";
import type { OperatorMetadata } from "../types/metadata";

export type {
InputPortInfo,
OutputPortInfo,
OperatorAdditionalMetadata,
OperatorSchema,
GroupInfo,
OperatorMetadata,
} from "../types/metadata";

interface BackendConfig {
apiEndpoint: string;
Expand All @@ -37,45 +47,6 @@ export function getBackendConfig(): BackendConfig {
return { ...currentConfig };
}

export interface InputPortInfo {
displayName?: string;
disallowMultiLinks?: boolean;
dependencies?: { id: number; internal: boolean }[];
}

export interface OutputPortInfo {
displayName?: string;
}

interface OperatorAdditionalMetadata {
userFriendlyName: string;
operatorGroupName: string;
operatorDescription?: string;
inputPorts: InputPortInfo[];
outputPorts: OutputPortInfo[];
dynamicInputPorts?: boolean;
dynamicOutputPorts?: boolean;
supportReconfiguration?: boolean;
allowPortCustomization?: boolean;
}

export interface OperatorSchema {
operatorType: string;
jsonSchema: any;
additionalMetadata: OperatorAdditionalMetadata;
operatorVersion: string;
}

interface GroupInfo {
groupName: string;
children?: GroupInfo[] | null;
}

export interface OperatorMetadata {
operators: OperatorSchema[];
groups: GroupInfo[];
}

export async function fetchOperatorMetadata(): Promise<OperatorMetadata> {
const url = `${currentConfig.apiEndpoint}/api/resources/operator-metadata`;
const response = await fetch(url);
Expand Down
97 changes: 97 additions & 0 deletions agent-service/src/api/compile-api.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { afterEach, describe, expect, mock, test } from "bun:test";
import { compileWorkflowAsync } from "./compile-api";
import type { LogicalPlan } from "../types/workflow";

const realFetch = globalThis.fetch;
afterEach(() => {
globalThis.fetch = realFetch;
});

const plan: LogicalPlan = {
operators: [{ operatorID: "op1", operatorType: "Filter" }],
links: [],
};

describe("compileWorkflowAsync", () => {
test("POSTs to /api/compile and returns the parsed compilation response", async () => {
// operatorErrors uses the proto-accurate WorkflowFatalError shape (type is the enum name string).
const responseBody = {
physicalPlan: { nodes: [] },
operatorOutputSchemas: {},
operatorErrors: {
op1: {
type: "COMPILATION_ERROR",
message: "bad attribute",
details: "stack",
operatorId: "op1",
workerId: "",
timestamp: { seconds: 1, nanos: 0 },
},
},
};
const fn = mock(async () => ({
ok: true,
status: 200,
statusText: "OK",
json: async () => responseBody,
text: async () => "",
}));
globalThis.fetch = fn as unknown as typeof fetch;

const result = await compileWorkflowAsync(plan);

const [url, init] = fn.mock.calls[0] as unknown as [string, RequestInit];
expect(url).toEndWith("/api/compile");
expect(init.method).toBe("POST");
expect(JSON.parse(init.body as string)).toEqual({
operators: plan.operators,
links: plan.links,
opsToReuseResult: [],
opsToViewResult: [],
});
expect(result).not.toBeNull();
expect(result!.operatorErrors.op1.type).toBe("COMPILATION_ERROR");
expect(result!.operatorErrors.op1.message).toBe("bad attribute");
});

test("returns null on a non-ok response", async () => {
const fn = mock(async () => ({
ok: false,
status: 400,
statusText: "Bad Request",
json: async () => ({}),
text: async () => "compile error",
}));
globalThis.fetch = fn as unknown as typeof fetch;

expect(await compileWorkflowAsync(plan)).toBeNull();
});

test("returns null when the request throws", async () => {
const fn = mock(async () => {
throw new Error("network down");
});
globalThis.fetch = fn as unknown as typeof fetch;

expect(await compileWorkflowAsync(plan)).toBeNull();
});
});
Loading
Loading