diff --git a/agent-service/src/agent/texera-agent.ts b/agent-service/src/agent/texera-agent.ts index 37eb12d8688..7bbd83e03ea 100644 --- a/agent-service/src/agent/texera-agent.ts +++ b/agent-service/src/agent/texera-agent.ts @@ -48,7 +48,8 @@ import { type ExecutionConfig, } from "./tools/workflow-execution-tools"; import { assembleContext } from "./util/context-utils"; -import { compileWorkflowAsync, type WorkflowCompilationResponse } from "../api/compile-api"; +import { compileWorkflowAsync } from "../api/compile-api"; +import type { WorkflowCompilationResponse } from "../types/dto"; import { createLogger } from "../logger"; import type { Logger } from "pino"; @@ -570,15 +571,18 @@ export class TexeraAgent { onStepFinish: async ({ text, toolCalls, toolResults, usage }) => { stepIndex++; + // The AI SDK types tc.input / tr.output as `unknown` for dynamically + // registered tools; narrow to the shapes our tools actually produce + // (object args, string results — see tools/*). const formattedToolCalls = toolCalls?.map(tc => ({ toolName: tc.toolName, toolCallId: tc.toolCallId, - input: tc.input, + input: tc.input as Record, })); const formattedToolResults = toolResults?.map(tr => ({ toolCallId: tr.toolCallId, - output: tr.output, + output: tr.output as string, isError: !!(tr.output as any)?.error, })); diff --git a/agent-service/src/agent/tools/result-formatting.test.ts b/agent-service/src/agent/tools/result-formatting.spec.ts similarity index 100% rename from agent-service/src/agent/tools/result-formatting.test.ts rename to agent-service/src/agent/tools/result-formatting.spec.ts diff --git a/agent-service/src/agent/tools/tools-utility.test.ts b/agent-service/src/agent/tools/tools-utility.spec.ts similarity index 100% rename from agent-service/src/agent/tools/tools-utility.test.ts rename to agent-service/src/agent/tools/tools-utility.spec.ts diff --git a/agent-service/src/agent/util/auto-layout.test.ts b/agent-service/src/agent/util/auto-layout.spec.ts similarity index 100% rename from agent-service/src/agent/util/auto-layout.test.ts rename to agent-service/src/agent/util/auto-layout.spec.ts diff --git a/agent-service/src/agent/util/context-utils.ts b/agent-service/src/agent/util/context-utils.ts index 195692cbf50..04e801e3aa7 100644 --- a/agent-service/src/agent/util/context-utils.ts +++ b/agent-service/src/agent/util/context-utils.ts @@ -25,7 +25,7 @@ import type { ModelMessage } from "ai"; import type { WorkflowState } from "../workflow-state"; import type { OperatorPredicate, OperatorPortSchemaMap, PortSchema } from "../../types/workflow"; import type { ReActStep } from "../../types/agent"; -import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../api/compile-api"; +import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../types/dto"; import { extractOperatorInputPortSchemaMap } from "./workflow-utils"; import { createLogger } from "../../logger"; diff --git a/agent-service/src/agent/util/workflow-system-metadata.spec.ts b/agent-service/src/agent/util/workflow-system-metadata.spec.ts new file mode 100644 index 00000000000..cc2976850f4 --- /dev/null +++ b/agent-service/src/agent/util/workflow-system-metadata.spec.ts @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test } from "bun:test"; +import { + WorkflowSystemMetadata, + formatValidationErrors, + formatCompactSchemaForError, +} from "./workflow-system-metadata"; +import type { OperatorMetadata, OperatorSchema } from "../../types/metadata"; + +function operator(overrides: Partial & Pick): OperatorSchema { + return { + operatorVersion: "1", + jsonSchema: {}, + additionalMetadata: { + userFriendlyName: overrides.operatorType, + operatorGroupName: "Test", + inputPorts: [], + outputPorts: [], + }, + ...overrides, + }; +} + +function metadataWith(...operators: OperatorSchema[]): OperatorMetadata { + return { operators, groups: [] }; +} + +// Filter exercises ref-inlining + key filtering; Limit is a clean Ajv-validatable schema. +const FILTER = operator({ + operatorType: "Filter", + additionalMetadata: { + userFriendlyName: "Filter", + operatorGroupName: "Filter", + inputPorts: [], + outputPorts: [], + operatorDescription: "Filters rows", + }, + jsonSchema: { + properties: { + attribute: { $ref: "#/definitions/AttributeName" }, + limit: { type: "integer", propertyOrder: 5 }, + dummyPropertyList: { type: "array" }, + }, + definitions: { + AttributeName: { type: "string", title: "Attribute" }, + PortDescription: { type: "object" }, + }, + required: ["attribute"], + }, +}); + +const LIMIT = operator({ + operatorType: "Limit", + jsonSchema: { type: "object", properties: { limit: { type: "integer" } }, required: ["limit"] }, +}); + +function loaded(...operators: OperatorSchema[]): WorkflowSystemMetadata { + const meta = new WorkflowSystemMetadata(); + meta.loadFromMetadata(metadataWith(...operators)); + return meta; +} + +describe("WorkflowSystemMetadata.loadFromMetadata", () => { + test("indexes operators by type", () => { + const meta = loaded(FILTER, LIMIT); + expect(meta.getOperatorCount()).toBe(2); + expect(meta.operatorTypeExists("Filter")).toBe(true); + expect(meta.operatorTypeExists("Nope")).toBe(false); + expect(meta.getSchema("Filter")).toEqual(FILTER.jsonSchema); + expect(meta.getAllOperatorTypes()).toEqual({ Filter: "Filters rows", Limit: "Limit" }); + }); + + test("getDescription falls back to userFriendlyName when no description", () => { + const meta = loaded(FILTER, LIMIT); + expect(meta.getDescription("Filter")).toBe("Filters rows"); + expect(meta.getDescription("Limit")).toBe("Limit"); + expect(meta.getDescription("Unknown")).toBe(""); + }); +}); + +describe("WorkflowSystemMetadata.getCompactSchema", () => { + test("returns null for an unknown operator type", () => { + expect(loaded(FILTER).getCompactSchema("Nope")).toBeNull(); + }); + + test("inlines $refs, strips noise keys, and drops filtered properties", () => { + const compact = loaded(FILTER).getCompactSchema("Filter"); + expect(compact).not.toBeNull(); + // $ref resolved to the AttributeName definition. + expect(compact!.properties.attribute).toEqual({ type: "string", title: "Attribute" }); + // propertyOrder is in COMPACT_SCHEMA_EXCLUDED_KEYS and is stripped. + expect(compact!.properties.limit).toEqual({ type: "integer" }); + // dummyPropertyList is in FILTERED_PROPERTY_KEYS and is removed. + expect(compact!.properties).not.toHaveProperty("dummyPropertyList"); + expect(compact!.required).toEqual(["attribute"]); + }); +}); + +describe("WorkflowSystemMetadata.getAllSchemasAsJson", () => { + test("emits filtered properties and definitions as JSON", () => { + const parsed = JSON.parse(loaded(FILTER).getAllSchemasAsJson()); + expect(Object.keys(parsed.Filter.properties)).toEqual(["attribute", "limit"]); // dummyPropertyList filtered + expect(parsed.Filter.definitions).toHaveProperty("AttributeName"); + expect(parsed.Filter.definitions).not.toHaveProperty("PortDescription"); // filtered definition + expect(parsed.Filter.required).toEqual(["attribute"]); + }); +}); + +describe("WorkflowSystemMetadata.validateOperatorProperties", () => { + test("accepts properties that satisfy the schema", () => { + expect(loaded(LIMIT).validateOperatorProperties("Limit", { limit: 5 })).toEqual({ isValid: true }); + }); + + test("reports the missing required property", () => { + const result = loaded(LIMIT).validateOperatorProperties("Limit", {}); + expect(result.isValid).toBe(false); + expect(result.isValid ? {} : result.messages).toHaveProperty("limit"); + }); + + test("rejects an unknown operator type", () => { + const result = loaded(LIMIT).validateOperatorProperties("Nope", {}); + expect(result.isValid).toBe(false); + expect(result.isValid ? "" : result.messages.error).toContain("Unknown operator type"); + }); +}); + +describe("formatValidationErrors", () => { + test("returns empty string when valid", () => { + expect(formatValidationErrors({ isValid: true })).toBe(""); + }); + + test("joins messages as 'key: msg'", () => { + expect(formatValidationErrors({ isValid: false, messages: { limit: "is required", attribute: "bad" } })).toBe( + "limit: is required; attribute: bad" + ); + }); +}); + +describe("formatCompactSchemaForError", () => { + test("renders only the required properties", () => { + const formatted = formatCompactSchemaForError({ + properties: { a: { type: "string" }, b: { type: "integer" } }, + required: ["a"], + }); + expect(formatted).toBe('required: [a], properties: {"a":{"type":"string"}}'); + }); +}); diff --git a/agent-service/src/agent/util/workflow-system-metadata.ts b/agent-service/src/agent/util/workflow-system-metadata.ts index 9269a0cff7c..da4fcdf3630 100644 --- a/agent-service/src/agent/util/workflow-system-metadata.ts +++ b/agent-service/src/agent/util/workflow-system-metadata.ts @@ -20,23 +20,13 @@ import Ajv from "ajv"; import { fetchOperatorMetadata, type OperatorSchema, type OperatorMetadata } from "../../api/backend-api"; import type { ValidationError, Validation } from "../../types/workflow"; +import type { OperatorSchemaInfo, CompactOperatorSchema } from "../../types/metadata"; import { createLogger } from "../../logger"; const log = createLogger("WorkflowSystemMetadata"); export type { ValidationError, Validation } from "../../types/workflow"; -interface OperatorSchemaInfo { - properties: any; - required: any; - definitions: any; -} - -interface CompactOperatorSchema { - properties: Record; - required: string[]; -} - const FILTERED_PROPERTY_KEYS = ["dummyPropertyList"]; const FILTERED_DEFINITION_KEYS = [ diff --git a/agent-service/src/agent/workflow-result-state.test.ts b/agent-service/src/agent/workflow-result-state.spec.ts similarity index 100% rename from agent-service/src/agent/workflow-result-state.test.ts rename to agent-service/src/agent/workflow-result-state.spec.ts diff --git a/agent-service/src/agent/workflow-state.test.ts b/agent-service/src/agent/workflow-state.spec.ts similarity index 100% rename from agent-service/src/agent/workflow-state.test.ts rename to agent-service/src/agent/workflow-state.spec.ts diff --git a/agent-service/src/api/backend-api.ts b/agent-service/src/api/backend-api.ts index ffd2c59433f..56e5ada4d64 100644 --- a/agent-service/src/api/backend-api.ts +++ b/agent-service/src/api/backend-api.ts @@ -18,6 +18,16 @@ */ import { env } from "../config/env"; +import type { OperatorMetadata } from "../types/metadata"; + +export type { + InputPortInfo, + OutputPortInfo, + OperatorAdditionalMetadata, + OperatorSchema, + GroupInfo, + OperatorMetadata, +} from "../types/metadata"; interface BackendConfig { apiEndpoint: string; @@ -37,45 +47,6 @@ export function getBackendConfig(): BackendConfig { return { ...currentConfig }; } -export interface InputPortInfo { - displayName?: string; - disallowMultiLinks?: boolean; - dependencies?: { id: number; internal: boolean }[]; -} - -export interface OutputPortInfo { - displayName?: string; -} - -interface OperatorAdditionalMetadata { - userFriendlyName: string; - operatorGroupName: string; - operatorDescription?: string; - inputPorts: InputPortInfo[]; - outputPorts: OutputPortInfo[]; - dynamicInputPorts?: boolean; - dynamicOutputPorts?: boolean; - supportReconfiguration?: boolean; - allowPortCustomization?: boolean; -} - -export interface OperatorSchema { - operatorType: string; - jsonSchema: any; - additionalMetadata: OperatorAdditionalMetadata; - operatorVersion: string; -} - -interface GroupInfo { - groupName: string; - children?: GroupInfo[] | null; -} - -export interface OperatorMetadata { - operators: OperatorSchema[]; - groups: GroupInfo[]; -} - export async function fetchOperatorMetadata(): Promise { const url = `${currentConfig.apiEndpoint}/api/resources/operator-metadata`; const response = await fetch(url); diff --git a/agent-service/src/api/compile-api.spec.ts b/agent-service/src/api/compile-api.spec.ts new file mode 100644 index 00000000000..270463db20a --- /dev/null +++ b/agent-service/src/api/compile-api.spec.ts @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { afterEach, describe, expect, mock, test } from "bun:test"; +import { compileWorkflowAsync } from "./compile-api"; +import type { LogicalPlan } from "../types/workflow"; + +const realFetch = globalThis.fetch; +afterEach(() => { + globalThis.fetch = realFetch; +}); + +const plan: LogicalPlan = { + operators: [{ operatorID: "op1", operatorType: "Filter" }], + links: [], +}; + +describe("compileWorkflowAsync", () => { + test("POSTs to /api/compile and returns the parsed compilation response", async () => { + // operatorErrors uses the proto-accurate WorkflowFatalError shape (type is the enum name string). + const responseBody = { + physicalPlan: { nodes: [] }, + operatorOutputSchemas: {}, + operatorErrors: { + op1: { + type: "COMPILATION_ERROR", + message: "bad attribute", + details: "stack", + operatorId: "op1", + workerId: "", + timestamp: { seconds: 1, nanos: 0 }, + }, + }, + }; + const fn = mock(async () => ({ + ok: true, + status: 200, + statusText: "OK", + json: async () => responseBody, + text: async () => "", + })); + globalThis.fetch = fn as unknown as typeof fetch; + + const result = await compileWorkflowAsync(plan); + + const [url, init] = fn.mock.calls[0] as unknown as [string, RequestInit]; + expect(url).toEndWith("/api/compile"); + expect(init.method).toBe("POST"); + expect(JSON.parse(init.body as string)).toEqual({ + operators: plan.operators, + links: plan.links, + opsToReuseResult: [], + opsToViewResult: [], + }); + expect(result).not.toBeNull(); + expect(result!.operatorErrors.op1.type).toBe("COMPILATION_ERROR"); + expect(result!.operatorErrors.op1.message).toBe("bad attribute"); + }); + + test("returns null on a non-ok response", async () => { + const fn = mock(async () => ({ + ok: false, + status: 400, + statusText: "Bad Request", + json: async () => ({}), + text: async () => "compile error", + })); + globalThis.fetch = fn as unknown as typeof fetch; + + expect(await compileWorkflowAsync(plan)).toBeNull(); + }); + + test("returns null when the request throws", async () => { + const fn = mock(async () => { + throw new Error("network down"); + }); + globalThis.fetch = fn as unknown as typeof fetch; + + expect(await compileWorkflowAsync(plan)).toBeNull(); + }); +}); diff --git a/agent-service/src/api/compile-api.ts b/agent-service/src/api/compile-api.ts index 8ffd27fd52c..defd02344a1 100644 --- a/agent-service/src/api/compile-api.ts +++ b/agent-service/src/api/compile-api.ts @@ -18,7 +18,8 @@ */ import { getBackendConfig } from "./backend-api"; -import type { LogicalPlan, OperatorPortSchemaMap } from "../types/workflow"; +import type { LogicalPlan } from "../types/workflow"; +import type { WorkflowCompilationResponse } from "../types/dto"; import { createLogger } from "../logger"; const log = createLogger("CompileAPI"); @@ -30,18 +31,6 @@ export interface SchemaAttribute { export type PortSchema = ReadonlyArray; -export interface WorkflowFatalError { - type: string; - message: string; - operatorId?: string; -} - -export interface WorkflowCompilationResponse { - physicalPlan?: any; - operatorOutputSchemas: Record; - operatorErrors: Record; -} - export async function compileWorkflowAsync(logicalPlan: LogicalPlan): Promise { const config = getBackendConfig(); const url = `${config.compileEndpoint}/api/compile`; diff --git a/agent-service/src/api/index.ts b/agent-service/src/api/index.ts index eca292d7ffe..1efba63ed67 100644 --- a/agent-service/src/api/index.ts +++ b/agent-service/src/api/index.ts @@ -20,5 +20,5 @@ export * from "./backend-api"; export * from "./execution-api"; export * from "./workflow-api"; -export * from "./auth-api"; +export * from "../auth/jwt"; export * from "./compile-api"; diff --git a/agent-service/src/api/workflow-api.spec.ts b/agent-service/src/api/workflow-api.spec.ts new file mode 100644 index 00000000000..8a8ecb3ed69 --- /dev/null +++ b/agent-service/src/api/workflow-api.spec.ts @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { afterEach, describe, expect, mock, test } from "bun:test"; +import { persistWorkflow, retrieveWorkflow } from "./workflow-api"; +import type { WorkflowContent } from "../types/workflow"; + +const realFetch = globalThis.fetch; +afterEach(() => { + globalThis.fetch = realFetch; +}); + +interface FakeResponseInit { + ok: boolean; + status?: number; + statusText?: string; + json?: unknown; + text?: string; +} + +function mockFetch(init: FakeResponseInit) { + const fn = mock(async () => ({ + ok: init.ok, + status: init.status ?? (init.ok ? 200 : 500), + statusText: init.statusText ?? "", + json: async () => init.json, + text: async () => init.text ?? "", + })); + globalThis.fetch = fn as unknown as typeof fetch; + return fn; +} + +const content: WorkflowContent = { + operators: [], + operatorPositions: {}, + links: [], + commentBoxes: [], + settings: { dataTransferBatchSize: 400 }, +}; + +function lastCall(fn: ReturnType): [string, RequestInit] { + return fn.mock.calls[0] as unknown as [string, RequestInit]; +} + +describe("persistWorkflow", () => { + test("POSTs to /workflow/persist with bearer auth and a stringified content body", async () => { + const fn = mockFetch({ ok: true, json: { wid: 1, name: "wf", content: JSON.stringify(content) } }); + + const result = await persistWorkflow("tok", 1, "wf", content, "desc"); + + const [url, init] = lastCall(fn); + expect(url).toEndWith("/api/workflow/persist"); + expect(init.method).toBe("POST"); + expect((init.headers as Record).Authorization).toBe("Bearer tok"); + expect(JSON.parse(init.body as string)).toEqual({ + wid: 1, + name: "wf", + description: "desc", + content: JSON.stringify(content), + isPublic: false, + }); + // The stringified content in the response is parsed back into an object. + expect(result.content).toEqual(content); + }); + + test("throws with status detail on a non-ok response", () => { + mockFetch({ ok: false, status: 500, statusText: "Server Error", text: "boom" }); + expect(persistWorkflow("tok", 1, "wf", content)).rejects.toThrow("Failed to persist workflow: 500"); + }); +}); + +describe("retrieveWorkflow", () => { + test("GETs /workflow/:wid with bearer auth and parses stringified content", async () => { + const fn = mockFetch({ ok: true, json: { wid: 7, name: "wf", content: JSON.stringify(content) } }); + + const result = await retrieveWorkflow("tok", 7); + + const [url, init] = lastCall(fn); + expect(url).toEndWith("/api/workflow/7"); + expect(init.method).toBe("GET"); + expect((init.headers as Record).Authorization).toBe("Bearer tok"); + expect(result.content).toEqual(content); + }); + + test("leaves an already-parsed content object untouched", async () => { + mockFetch({ ok: true, json: { wid: 7, name: "wf", content } }); + const result = await retrieveWorkflow("tok", 7); + expect(result.content).toEqual(content); + }); + + test("throws with status detail on a non-ok response", () => { + mockFetch({ ok: false, status: 404, statusText: "Not Found", text: "missing" }); + expect(retrieveWorkflow("tok", 7)).rejects.toThrow("Failed to retrieve workflow: 404"); + }); +}); diff --git a/agent-service/src/api/workflow-api.ts b/agent-service/src/api/workflow-api.ts index 7a96f979a1c..39387563b6e 100644 --- a/agent-service/src/api/workflow-api.ts +++ b/agent-service/src/api/workflow-api.ts @@ -18,26 +18,9 @@ */ import { getBackendConfig } from "./backend-api"; -import { createAuthHeaders } from "./auth-api"; +import { createAuthHeaders } from "../auth/jwt"; import type { WorkflowContent } from "../types/workflow"; - -export interface Workflow { - wid: number; - name: string; - description?: string; - content: WorkflowContent; - creationTime?: number; - lastModifiedTime?: number; - isPublished?: boolean; -} - -interface WorkflowPersistRequest { - wid?: number; - name: string; - description?: string; - content: string; - isPublic?: boolean; -} +import type { Workflow, WorkflowPersistRequest } from "../types/dto"; const WORKFLOW_BASE_URL = "workflow"; diff --git a/agent-service/src/auth/jwt.spec.ts b/agent-service/src/auth/jwt.spec.ts new file mode 100644 index 00000000000..4ba403dcb41 --- /dev/null +++ b/agent-service/src/auth/jwt.spec.ts @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test } from "bun:test"; +import { extractUserFromToken, validateToken, extractBearerToken, createAuthHeaders } from "./jwt"; + +// Encode segments as base64url (no padding, `-`/`_` alphabet) to match real JWTs. +function makeToken(payload: Record): string { + const encode = (o: Record) => Buffer.from(JSON.stringify(o)).toString("base64url"); + return `${encode({ alg: "none", typ: "JWT" })}.${encode(payload)}.signature`; +} + +const nowSeconds = () => Math.floor(Date.now() / 1000); + +describe("extractUserFromToken", () => { + test("maps JWT claims onto a UserInfo", () => { + const token = makeToken({ userId: 7, sub: "alice", email: "alice@example.com", role: "ADMIN" }); + expect(extractUserFromToken(token)).toEqual({ uid: 7, name: "alice", email: "alice@example.com", role: "ADMIN" }); + }); + + test("defaults missing email and role", () => { + const token = makeToken({ userId: 1, sub: "bob" }); + expect(extractUserFromToken(token)).toEqual({ uid: 1, name: "bob", email: "", role: "REGULAR" }); + }); + + test("throws on a malformed token", () => { + expect(() => extractUserFromToken("not-a-jwt")).toThrow("Failed to decode JWT"); + }); + + test("decodes a token whose base64url payload contains -/_ characters", () => { + const token = makeToken({ userId: 9, sub: "a~?>>", email: "x@y.z" }); + // Guard that this case stays meaningful: the payload segment must use the url-safe alphabet. + expect(token.split(".")[1]).toMatch(/[-_]/); + expect(extractUserFromToken(token)).toEqual({ uid: 9, name: "a~?>>", email: "x@y.z", role: "REGULAR" }); + }); +}); + +describe("validateToken", () => { + test("accepts a token expiring in the future", () => { + expect(validateToken(makeToken({ sub: "a", exp: nowSeconds() + 3600 }))).toBe(true); + }); + + test("rejects an expired token", () => { + expect(validateToken(makeToken({ sub: "a", exp: nowSeconds() - 3600 }))).toBe(false); + }); + + test("treats a token without exp as valid", () => { + expect(validateToken(makeToken({ sub: "a" }))).toBe(true); + }); + + test("rejects a malformed token", () => { + expect(validateToken("garbage")).toBe(false); + }); +}); + +describe("extractBearerToken", () => { + test("extracts the token from a Bearer header", () => { + expect(extractBearerToken("Bearer abc.def.ghi")).toBe("abc.def.ghi"); + }); + + test("is case-insensitive on the scheme", () => { + expect(extractBearerToken("bearer xyz")).toBe("xyz"); + }); + + test("returns undefined for a non-Bearer scheme", () => { + expect(extractBearerToken("Basic abc")).toBeUndefined(); + }); + + test("returns undefined when the token is missing", () => { + expect(extractBearerToken("Bearer")).toBeUndefined(); + }); + + test("returns undefined for an absent header", () => { + expect(extractBearerToken(undefined)).toBeUndefined(); + }); +}); + +describe("createAuthHeaders", () => { + test("builds bearer auth headers", () => { + expect(createAuthHeaders("tok")).toEqual({ Authorization: "Bearer tok", "Content-Type": "application/json" }); + }); +}); diff --git a/agent-service/src/api/auth-api.ts b/agent-service/src/auth/jwt.ts similarity index 100% rename from agent-service/src/api/auth-api.ts rename to agent-service/src/auth/jwt.ts diff --git a/agent-service/src/config/endpoints.ts b/agent-service/src/config/endpoints.ts new file mode 100644 index 00000000000..cab8c7ff20c --- /dev/null +++ b/agent-service/src/config/endpoints.ts @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { env } from "./env"; + +/** Base URLs of the backend services this agent service talks to. */ +export interface ServiceEndpoints { + apiEndpoint: string; + modelsEndpoint: string; + compileEndpoint: string; + executionEndpoint: string; +} + +const endpoints: ServiceEndpoints = { + apiEndpoint: env.TEXERA_DASHBOARD_SERVICE_ENDPOINT, + modelsEndpoint: env.LLM_ENDPOINT, + compileEndpoint: env.WORKFLOW_COMPILING_SERVICE_ENDPOINT, + executionEndpoint: env.WORKFLOW_EXECUTION_SERVICE_ENDPOINT, +}; + +export function getServiceEndpoints(): ServiceEndpoints { + return { ...endpoints }; +} diff --git a/agent-service/src/server.test.ts b/agent-service/src/server.spec.ts similarity index 100% rename from agent-service/src/server.test.ts rename to agent-service/src/server.spec.ts diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index 0da3f693797..0525008400b 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -23,7 +23,7 @@ import { createOpenAI } from "@ai-sdk/openai"; import { TexeraAgent } from "./agent/texera-agent"; import { getVisibleResultHeaders } from "./agent/tools/tools-utility"; import { getBackendConfig } from "./api/backend-api"; -import { extractBearerToken, extractUserFromToken, validateToken } from "./api/auth-api"; +import { extractBearerToken, extractUserFromToken, validateToken } from "./auth/jwt"; import { retrieveWorkflow } from "./api/workflow-api"; import { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata"; import { env } from "./config/env"; @@ -40,6 +40,7 @@ import type { ReActStep, } from "./types/agent"; import { OperatorResultSerializationMode } from "./types/agent"; +import type { WsClientRequest, WsServerMessage, WsServerInitMessage, OperatorResultSummaryWs } from "./types/ws"; const agentStore = new Map(); let agentCounter = 0; @@ -410,37 +411,6 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) } ); -interface WsMessage { - type: "message" | "stop"; - content?: string; - messageSource?: "chat" | "feedback"; -} - -interface OperatorResultSummaryWs { - state: string; - inputTuples: number; - outputTuples: number; - inputPortShapes?: { portIndex: number; rows: number; columns: number }[]; - outputColumns?: number; - error?: string; - warnings?: string[]; - consoleLogCount?: number; - totalRowCount?: number; - sampleRecords?: Record[]; - resultStatistics?: Record; -} - -interface WsOutgoingMessage { - type: "step" | "state" | "error" | "complete" | "init" | "headChange"; - step?: ReActStep; - state?: string; - error?: string; - steps?: ReActStep[]; - headId?: string; - operatorResults?: Record; - workflowContent?: any; -} - function getOperatorResultSummaries(agent: TexeraAgent): Record { const resultState = agent.getWorkflowResultState(); const visible = resultState.getAllVisible(); @@ -464,7 +434,7 @@ function getOperatorResultSummaries(agent: TexeraAgent): Record) { for (const route of wsRoutes) { console.log(` WS ${route.path}`); } - console.log(" Send: { type: 'message', content: '...' }"); - console.log(" Send: { type: 'stop' }"); - console.log(" Recv: { type: 'step' | 'state' | 'complete' | 'error' | 'init', ... }"); + console.log(" Send: { type: 'prompt', content: '...' }"); + console.log(" Send: { type: 'command', commandType: 'stop' }"); + console.log(" Recv: { type: 'step' | 'state' | 'complete' | 'error' | 'init' | 'headChange', ... }"); } console.log(""); diff --git a/agent-service/src/types/agent.ts b/agent-service/src/types/agent.ts index 694b51785fd..03c3524a837 100644 --- a/agent-service/src/types/agent.ts +++ b/agent-service/src/types/agent.ts @@ -17,6 +17,7 @@ * under the License. */ +import type { ModelMessage } from "ai"; import type { WorkflowContent } from "./workflow"; export enum AgentState { @@ -48,15 +49,15 @@ export interface ReActStep { toolCalls?: Array<{ toolName: string; toolCallId: string; - input: any; + input: Record; }>; toolResults?: Array<{ toolCallId: string; - output: any; + output: string; isError?: boolean; }>; usage?: TokenUsage; - inputMessages?: any[]; + inputMessages?: ModelMessage[]; messageSource?: "chat" | "feedback"; beforeWorkflowContent?: WorkflowContent; afterWorkflowContent?: WorkflowContent; diff --git a/agent-service/src/types/dto.ts b/agent-service/src/types/dto.ts new file mode 100644 index 00000000000..f740d930dc1 --- /dev/null +++ b/agent-service/src/types/dto.ts @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// DTOs: request/response bodies exchanged with backend services. Distinct from +// domain types (workflow.ts, execution.ts, agent.ts) which model in-memory +// state, and from ws.ts which carries this service's own WebSocket frames. + +import type { WorkflowContent, OperatorPortSchemaMap } from "./workflow"; + +// --- Dashboard Service: workflow persistence --- + +export interface Workflow { + wid: number; + name: string; + description?: string; + content: WorkflowContent; + creationTime?: number; + lastModifiedTime?: number; + isPublished?: boolean; +} + +export interface WorkflowPersistRequest { + wid?: number; + name: string; + description?: string; + content: string; + isPublic?: boolean; +} + +// --- Workflow Compiling Service --- + +export interface WorkflowFatalError { + // FatalErrorType enum name, e.g. "COMPILATION_ERROR" | "EXECUTION_FAILURE". + type: string; + message: string; + details?: string; + operatorId?: string; + workerId?: string; + timestamp?: { seconds: number; nanos: number }; +} + +export interface WorkflowCompilationResponse { + physicalPlan?: unknown; + operatorOutputSchemas: Record; + operatorErrors: Record; +} diff --git a/agent-service/src/types/index.ts b/agent-service/src/types/index.ts index c6d7291e51d..64227cc909f 100644 --- a/agent-service/src/types/index.ts +++ b/agent-service/src/types/index.ts @@ -19,4 +19,7 @@ export * from "./workflow"; export * from "./execution"; +export * from "./metadata"; export * from "./agent"; +export * from "./dto"; +export * from "./ws"; diff --git a/agent-service/src/types/metadata.ts b/agent-service/src/types/metadata.ts new file mode 100644 index 00000000000..49a3bdeed98 --- /dev/null +++ b/agent-service/src/types/metadata.ts @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Operator metadata shapes served by the Dashboard Service +// (`/api/resources/operator-metadata`) and the compact variants the agent +// derives from them for prompts and validation. + +export interface InputPortInfo { + displayName?: string; + disallowMultiLinks?: boolean; + dependencies?: { id: number; internal: boolean }[]; +} + +export interface OutputPortInfo { + displayName?: string; +} + +export interface OperatorAdditionalMetadata { + userFriendlyName: string; + operatorGroupName: string; + operatorDescription?: string; + inputPorts: InputPortInfo[]; + outputPorts: OutputPortInfo[]; + dynamicInputPorts?: boolean; + dynamicOutputPorts?: boolean; + supportReconfiguration?: boolean; + allowPortCustomization?: boolean; +} + +export interface OperatorSchema { + operatorType: string; + jsonSchema: Record; + additionalMetadata: OperatorAdditionalMetadata; + operatorVersion: string; +} + +export interface GroupInfo { + groupName: string; + children?: GroupInfo[] | null; +} + +export interface OperatorMetadata { + operators: OperatorSchema[]; + groups: GroupInfo[]; +} + +/** Full per-operator schema slice surfaced to debugging/inspection callers. */ +export interface OperatorSchemaInfo { + properties: Record; + required: string[]; + definitions: Record; +} + +/** Reduced operator schema (refs inlined, noise stripped) used in prompts and errors. */ +export interface CompactOperatorSchema { + properties: Record; + required: string[]; +} diff --git a/agent-service/src/types/workflow.ts b/agent-service/src/types/workflow.ts index 52c6493cf5f..241b4d9e83c 100644 --- a/agent-service/src/types/workflow.ts +++ b/agent-service/src/types/workflow.ts @@ -52,7 +52,7 @@ export interface OperatorPredicate { readonly operatorID: string; readonly operatorType: string; readonly operatorVersion: string; - readonly operatorProperties: Record; + readonly operatorProperties: Record; readonly inputPorts: PortDescription[]; readonly outputPorts: PortDescription[]; readonly dynamicInputPorts?: boolean; @@ -67,7 +67,7 @@ export interface OperatorPredicate { export interface LogicalOperator { readonly operatorID: string; readonly operatorType: string; - readonly [key: string]: any; + readonly [key: string]: unknown; } export interface OperatorLink { @@ -131,7 +131,7 @@ export interface OperatorDetail { operatorId: string; operatorType: string; customDisplayName?: string; - operatorProperties: Record; + operatorProperties: Record; inputPorts: PortDescription[]; outputPorts: PortDescription[]; } diff --git a/agent-service/src/types/ws/client.ts b/agent-service/src/types/ws/client.ts new file mode 100644 index 00000000000..edc6115b7d2 --- /dev/null +++ b/agent-service/src/types/ws/client.ts @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Client -> server WebSocket frames for this service's protocol +// (/agents/:id/react). Modeled as a discriminated union on `type` so each +// request kind carries only its own fields, rather than one interface with +// everything optional. + +interface WsClientRequestBase { + type: "prompt" | "command"; +} + +// A user prompt to run through the agent. +export interface WsClientRequestPrompt extends WsClientRequestBase { + type: "prompt"; + content: string; + messageSource?: "chat" | "feedback"; +} + +// A control command. Today the only command stops the in-flight run; the +// `commandType` discriminator leaves room for additional commands later. +export interface WsClientRequestStopCommand extends WsClientRequestBase { + type: "command"; + commandType: "stop"; +} + +export type WsClientRequest = WsClientRequestPrompt | WsClientRequestStopCommand; diff --git a/agent-service/src/types/ws/index.ts b/agent-service/src/types/ws/index.ts new file mode 100644 index 00000000000..90f3faac7e0 --- /dev/null +++ b/agent-service/src/types/ws/index.ts @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// WebSocket frames for this service's own protocol (/agents/:id/react): +// inbound client requests and the outbound server messages it pushes back. + +export * from "./client"; +export * from "./server"; diff --git a/agent-service/src/types/ws/server.ts b/agent-service/src/types/ws/server.ts new file mode 100644 index 00000000000..5911981ac0a --- /dev/null +++ b/agent-service/src/types/ws/server.ts @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Server -> client WebSocket frames for this service's protocol +// (/agents/:id/react). Modeled as a discriminated union on `type` so each +// message kind declares exactly the fields it sends. + +import type { ReActStep } from "../agent"; +import type { WorkflowContent } from "../workflow"; + +// Wire projection of an operator's execution result, summarized for the client +// (counts instead of full payloads; only a sample of records). +export interface OperatorResultSummaryWs { + state: string; + inputTuples: number; + outputTuples: number; + inputPortShapes?: { portIndex: number; rows: number; columns: number }[]; + outputColumns?: number; + error?: string; + warnings?: string[]; + consoleLogCount?: number; + totalRowCount?: number; + sampleRecords?: Record[]; + resultStatistics?: Record; +} + +type OperatorResults = Record; + +interface WsServerMessageBase { + type: "init" | "step" | "state" | "complete" | "error" | "headChange"; +} + +// Sent once on connect: a snapshot of the agent's current state and steps. +export interface WsServerInitMessage extends WsServerMessageBase { + type: "init"; + state: string; + steps: ReActStep[]; + headId: string; + operatorResults: OperatorResults; +} + +// A single ReAct step streamed as the agent runs. Operator results accompany +// steps that ran tools. +export interface WsServerStepMessage extends WsServerMessageBase { + type: "step"; + step: ReActStep; + operatorResults?: OperatorResults; +} + +// An agent lifecycle transition (e.g. GENERATING, STOPPING). +export interface WsServerStateMessage extends WsServerMessageBase { + type: "state"; + state: string; +} + +// Terminal message for a finished run. +export interface WsServerCompleteMessage extends WsServerMessageBase { + type: "complete"; + state: string; + operatorResults: OperatorResults; +} + +// An error surfaced to the client. +export interface WsServerErrorMessage extends WsServerMessageBase { + type: "error"; + error: string; +} + +// Emitted after a checkout: the head moved, carrying the full step list and the +// workflow snapshot at the new head. +export interface WsServerHeadChangeMessage extends WsServerMessageBase { + type: "headChange"; + headId: string; + steps: ReActStep[]; + workflowContent?: WorkflowContent; + operatorResults: OperatorResults; +} + +export type WsServerMessage = + | WsServerInitMessage + | WsServerStepMessage + | WsServerStateMessage + | WsServerCompleteMessage + | WsServerErrorMessage + | WsServerHeadChangeMessage; diff --git a/frontend/src/app/workspace/service/agent/agent.service.ts b/frontend/src/app/workspace/service/agent/agent.service.ts index 462e7679ce5..ec1872a4c6a 100644 --- a/frontend/src/app/workspace/service/agent/agent.service.ts +++ b/frontend/src/app/workspace/service/agent/agent.service.ts @@ -909,7 +909,7 @@ export class AgentService { } const wsMessage = { - type: "message", + type: "prompt", content: message, messageSource, }; @@ -967,7 +967,7 @@ export class AgentService { if (tracking?.websocket && tracking.websocket.readyState === WebSocket.OPEN) { // Send stop via WebSocket for immediate effect try { - tracking.websocket.send(JSON.stringify({ type: "stop" })); + tracking.websocket.send(JSON.stringify({ type: "command", commandType: "stop" })); } catch (error) { console.error("Failed to send stop command:", error); }