diff --git a/agent-service/src/agent/texera-agent.ts b/agent-service/src/agent/texera-agent.ts index 37eb12d8688..192f28c712d 100644 --- a/agent-service/src/agent/texera-agent.ts +++ b/agent-service/src/agent/texera-agent.ts @@ -48,7 +48,8 @@ import { type ExecutionConfig, } from "./tools/workflow-execution-tools"; import { assembleContext } from "./util/context-utils"; -import { compileWorkflowAsync, type WorkflowCompilationResponse } from "../api/compile-api"; +import { compileWorkflowAsync } from "../api/compile-client"; +import type { WorkflowCompilationResponse } from "../types/api"; import { createLogger } from "../logger"; import type { Logger } from "pino"; @@ -420,7 +421,7 @@ export class TexeraAgent { } try { - const { retrieveWorkflow } = await import("../api/workflow-api"); + const { retrieveWorkflow } = await import("../api/workflow-client"); const workflow = await retrieveWorkflow(this.delegateConfig.userToken, this.delegateConfig.workflowId); this.workflowState.setWorkflowContent(workflow.content); this.log.debug({ workflowId: this.delegateConfig.workflowId }, "refreshed workflow from backend"); @@ -464,7 +465,7 @@ export class TexeraAgent { } try { - const { persistWorkflow } = await import("../api/workflow-api"); + const { persistWorkflow } = await import("../api/workflow-client"); const workflowContent = this.workflowState.getWorkflowContent(); await persistWorkflow( this.delegateConfig.userToken, diff --git a/agent-service/src/agent/tools/workflow-execution-tools.ts b/agent-service/src/agent/tools/workflow-execution-tools.ts index 78c6cfa3d55..e23e52a06bf 100644 --- a/agent-service/src/agent/tools/workflow-execution-tools.ts +++ b/agent-service/src/agent/tools/workflow-execution-tools.ts @@ -23,7 +23,7 @@ import { createErrorResult, formatExecuteOperatorResult, getVisibleResultHeaders import type { WorkflowState } from "../workflow-state"; import { getBackendConfig } from "../../api/backend-api"; import { env } from "../../config/env"; -import type { LogicalPlan, LogicalLink } from "../../api/execution-api"; +import type { LogicalPlan, LogicalLink } from "../../types/workflow"; import type { OperatorInfo, SyncExecutionResult } from "../../types/execution"; import { WorkflowSystemMetadata } from "../util/workflow-system-metadata"; import { DEFAULT_AGENT_SETTINGS } from "../../types/agent"; diff --git a/agent-service/src/agent/util/context-utils.ts b/agent-service/src/agent/util/context-utils.ts index 195692cbf50..61f6ff15c38 100644 --- a/agent-service/src/agent/util/context-utils.ts +++ b/agent-service/src/agent/util/context-utils.ts @@ -25,7 +25,7 @@ import type { ModelMessage } from "ai"; import type { WorkflowState } from "../workflow-state"; import type { OperatorPredicate, OperatorPortSchemaMap, PortSchema } from "../../types/workflow"; import type { ReActStep } from "../../types/agent"; -import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../api/compile-api"; +import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../types/api"; import { extractOperatorInputPortSchemaMap } from "./workflow-utils"; import { createLogger } from "../../logger"; diff --git a/agent-service/src/agent/util/workflow-system-metadata.ts b/agent-service/src/agent/util/workflow-system-metadata.ts index 9269a0cff7c..692315a1851 100644 --- a/agent-service/src/agent/util/workflow-system-metadata.ts +++ b/agent-service/src/agent/util/workflow-system-metadata.ts @@ -18,7 +18,8 @@ */ import Ajv from "ajv"; -import { fetchOperatorMetadata, type OperatorSchema, type OperatorMetadata } from "../../api/backend-api"; +import { fetchOperatorMetadata } from "../../api/operator-metadata-client"; +import type { OperatorSchema, OperatorMetadata } from "../../types/metadata"; import type { ValidationError, Validation } from "../../types/workflow"; import { createLogger } from "../../logger"; diff --git a/agent-service/src/api/backend-api.ts b/agent-service/src/api/backend-api.ts index ffd2c59433f..5c943b8be54 100644 --- a/agent-service/src/api/backend-api.ts +++ b/agent-service/src/api/backend-api.ts @@ -36,53 +36,3 @@ const currentConfig: BackendConfig = { export function getBackendConfig(): BackendConfig { return { ...currentConfig }; } - -export interface InputPortInfo { - displayName?: string; - disallowMultiLinks?: boolean; - dependencies?: { id: number; internal: boolean }[]; -} - -export interface OutputPortInfo { - displayName?: string; -} - -interface OperatorAdditionalMetadata { - userFriendlyName: string; - operatorGroupName: string; - operatorDescription?: string; - inputPorts: InputPortInfo[]; - outputPorts: OutputPortInfo[]; - dynamicInputPorts?: boolean; - dynamicOutputPorts?: boolean; - supportReconfiguration?: boolean; - allowPortCustomization?: boolean; -} - -export interface OperatorSchema { - operatorType: string; - jsonSchema: any; - additionalMetadata: OperatorAdditionalMetadata; - operatorVersion: string; -} - -interface GroupInfo { - groupName: string; - children?: GroupInfo[] | null; -} - -export interface OperatorMetadata { - operators: OperatorSchema[]; - groups: GroupInfo[]; -} - -export async function fetchOperatorMetadata(): Promise { - const url = `${currentConfig.apiEndpoint}/api/resources/operator-metadata`; - const response = await fetch(url); - - if (!response.ok) { - throw new Error(`Failed to fetch operator metadata: ${response.status} ${response.statusText}`); - } - - return (await response.json()) as OperatorMetadata; -} diff --git a/agent-service/src/api/compile-client.test.ts b/agent-service/src/api/compile-client.test.ts new file mode 100644 index 00000000000..4f32867892a --- /dev/null +++ b/agent-service/src/api/compile-client.test.ts @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test, afterEach } from "bun:test"; +import { compileWorkflowAsync } from "./compile-client"; +import type { LogicalPlan } from "../types/workflow"; + +const realFetch = globalThis.fetch; + +interface CapturedRequest { + url: string; + init: any; +} + +let captured: CapturedRequest | undefined; + +function mockFetch(responder: (req: CapturedRequest) => Response | Promise): void { + globalThis.fetch = (async (input: any, init: any) => { + captured = { url: String(input), init }; + return responder(captured); + }) as unknown as typeof fetch; +} + +afterEach(() => { + globalThis.fetch = realFetch; + captured = undefined; +}); + +const PLAN: LogicalPlan = { + operators: [{ operatorID: "op1", operatorType: "CSVFileScan" }], + links: [], +}; + +describe("compileWorkflowAsync", () => { + test("POSTs the plan to the compiling service and returns the parsed response", async () => { + const payload = { operatorOutputSchemas: {}, operatorErrors: {} }; + mockFetch(() => new Response(JSON.stringify(payload), { status: 200 })); + + const result = await compileWorkflowAsync(PLAN); + + expect(result).toEqual(payload); + expect(captured?.url).toBe("http://localhost:9090/api/compile"); + expect(captured?.init.method).toBe("POST"); + const body = JSON.parse(captured!.init.body); + expect(body.operators).toEqual(PLAN.operators); + expect(body.links).toEqual([]); + expect(body.opsToReuseResult).toEqual([]); + expect(body.opsToViewResult).toEqual([]); + }); + + test("returns null when the compiling service responds non-OK", async () => { + mockFetch(() => new Response("compile error", { status: 400, statusText: "Bad Request" })); + + expect(await compileWorkflowAsync(PLAN)).toBeNull(); + }); + + test("returns null on a network failure", async () => { + mockFetch(() => { + throw new Error("connection refused"); + }); + + expect(await compileWorkflowAsync(PLAN)).toBeNull(); + }); +}); diff --git a/agent-service/src/api/compile-api.ts b/agent-service/src/api/compile-client.ts similarity index 68% rename from agent-service/src/api/compile-api.ts rename to agent-service/src/api/compile-client.ts index 8ffd27fd52c..84edaf7368a 100644 --- a/agent-service/src/api/compile-api.ts +++ b/agent-service/src/api/compile-client.ts @@ -17,34 +17,15 @@ * under the License. */ -import { getBackendConfig } from "./backend-api"; -import type { LogicalPlan, OperatorPortSchemaMap } from "../types/workflow"; +import { getServiceEndpoints } from "../config/endpoints"; +import type { LogicalPlan } from "../types/workflow"; +import type { WorkflowCompilationResponse } from "../types/api"; import { createLogger } from "../logger"; -const log = createLogger("CompileAPI"); - -export interface SchemaAttribute { - attributeName: string; - attributeType: "string" | "integer" | "double" | "boolean" | "long" | "timestamp" | "binary"; -} - -export type PortSchema = ReadonlyArray; - -export interface WorkflowFatalError { - type: string; - message: string; - operatorId?: string; -} - -export interface WorkflowCompilationResponse { - physicalPlan?: any; - operatorOutputSchemas: Record; - operatorErrors: Record; -} +const log = createLogger("CompileClient"); export async function compileWorkflowAsync(logicalPlan: LogicalPlan): Promise { - const config = getBackendConfig(); - const url = `${config.compileEndpoint}/api/compile`; + const url = `${getServiceEndpoints().compileEndpoint}/api/compile`; const body = { operators: logicalPlan.operators, diff --git a/agent-service/src/api/execution-client.test.ts b/agent-service/src/api/execution-client.test.ts new file mode 100644 index 00000000000..0992d3ebe54 --- /dev/null +++ b/agent-service/src/api/execution-client.test.ts @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test, afterEach } from "bun:test"; +import { executeWorkflowHttp } from "./execution-client"; +import { DEFAULT_AGENT_SETTINGS, type ExecutionRequestParams } from "../types/agent"; +import type { LogicalPlan } from "../types/workflow"; +import { env } from "../config/env"; + +const realFetch = globalThis.fetch; + +interface CapturedRequest { + url: string; + init: any; +} + +let captured: CapturedRequest | undefined; + +function mockFetch(responder: (req: CapturedRequest) => Response | Promise): void { + globalThis.fetch = (async (input: any, init: any) => { + captured = { url: String(input), init }; + return responder(captured); + }) as unknown as typeof fetch; +} + +function okResponse(body: unknown): Response { + return new Response(JSON.stringify(body), { status: 200, headers: { "Content-Type": "application/json" } }); +} + +afterEach(() => { + globalThis.fetch = realFetch; + captured = undefined; +}); + +const PLAN: LogicalPlan = { + operators: [{ operatorID: "op1", operatorType: "CSVFileScan" }], + links: [], + opsToViewResult: ["op1"], + opsToReuseResult: [], +}; + +const PARAMS: ExecutionRequestParams = { + userToken: "test-token", + workflowId: 42, + computingUnitId: 7, + executionTimeoutMs: 60_000, + maxOperatorResultCharLimit: 1000, + maxOperatorResultCellCharLimit: 500, +}; + +describe("executeWorkflowHttp", () => { + test("posts to the per-CU execution endpoint with auth and a complete request body", async () => { + mockFetch(() => okResponse({ success: true, state: "Completed", operators: {} })); + + const result = await executeWorkflowHttp(PARAMS, PLAN); + + expect(result).toEqual({ success: true, state: "Completed", operators: {} }); + expect(captured?.url).toBe("http://localhost:8085/api/execution/42/7/run"); + expect(captured?.init.method).toBe("POST"); + expect(captured?.init.headers.Authorization).toBe("Bearer test-token"); + + const body = JSON.parse(captured!.init.body); + expect(body.executionName).toBe("agent-execution"); + expect(body.logicalPlan.operators).toEqual(PLAN.operators); + expect(body.logicalPlan.opsToReuseResult).toEqual([]); + expect(body.targetOperatorIds).toEqual(["op1"]); + expect(body.timeoutSeconds).toBe(60); + expect(body.maxOperatorResultCharLimit).toBe(1000); + expect(body.maxOperatorResultCellCharLimit).toBe(500); + }); + + test("defaults computingUnitId to 0 and falls back to default limits/timeout", async () => { + mockFetch(() => okResponse({ success: true, state: "Completed", operators: {} })); + + await executeWorkflowHttp({ userToken: "t", workflowId: 42 }, PLAN); + + expect(captured?.url).toBe("http://localhost:8085/api/execution/42/0/run"); + const body = JSON.parse(captured!.init.body); + expect(body.timeoutSeconds).toBe(Math.ceil(DEFAULT_AGENT_SETTINGS.executionTimeoutMs / 1000)); + expect(body.maxOperatorResultCharLimit).toBe(DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit); + expect(body.maxOperatorResultCellCharLimit).toBe(DEFAULT_AGENT_SETTINGS.maxOperatorResultCellCharLimit); + }); + + test("returns an error-state result when the backend responds non-OK", async () => { + mockFetch(() => new Response("boom", { status: 500, statusText: "Server Error" })); + + const result = await executeWorkflowHttp(PARAMS, PLAN); + + expect(result.success).toBe(false); + expect(result.state).toBe("Error"); + expect(result.operators).toEqual({}); + expect(result.errors?.[0]).toContain("500"); + expect(result.errors?.[0]).toContain("boom"); + }); + + test("returns an error-state result on a network failure", async () => { + mockFetch(() => { + throw new Error("network down"); + }); + + const result = await executeWorkflowHttp(PARAMS, PLAN); + + expect(result).toEqual({ success: false, state: "Error", operators: {}, errors: ["network down"] }); + }); + + test("re-throws AbortError so callers can detect cancellation", async () => { + mockFetch(() => { + const err = new Error("aborted"); + err.name = "AbortError"; + throw err; + }); + + await expect(executeWorkflowHttp(PARAMS, PLAN)).rejects.toThrow("aborted"); + }); + + test("uses EXECUTION_ENDPOINT_TEMPLATE with {cuid} substituted when configured", async () => { + const previous = env.EXECUTION_ENDPOINT_TEMPLATE; + (env as any).EXECUTION_ENDPOINT_TEMPLATE = "http://cu-{cuid}.svc:8085"; + try { + mockFetch(() => okResponse({ success: true, state: "Completed", operators: {} })); + await executeWorkflowHttp(PARAMS, PLAN); + expect(captured?.url).toBe("http://cu-7.svc:8085/api/execution/42/7/run"); + } finally { + (env as any).EXECUTION_ENDPOINT_TEMPLATE = previous; + } + }); +}); diff --git a/agent-service/src/api/execution-client.ts b/agent-service/src/api/execution-client.ts new file mode 100644 index 00000000000..08b7bd96efd --- /dev/null +++ b/agent-service/src/api/execution-client.ts @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { getServiceEndpoints } from "../config/endpoints"; +import { env } from "../config/env"; +import type { LogicalPlan } from "../types/workflow"; +import type { SyncExecutionResult } from "../types/execution"; +import { DEFAULT_AGENT_SETTINGS, type ExecutionRequestParams } from "../types/agent"; +import { createLogger } from "../logger"; + +const log = createLogger("ExecutionClient"); + +/** + * POSTs a logical plan to the Workflow Execution Service and returns the + * synchronous run result. Network/abort errors are surfaced; non-abort + * failures are logged and returned as an error-state result. + */ +export async function executeWorkflowHttp( + params: ExecutionRequestParams, + logicalPlan: LogicalPlan, + options: { abortSignal?: AbortSignal } = {} +): Promise { + const workflowId = params.workflowId; + const computingUnitId = params.computingUnitId ?? 0; + + // In k8s each computing unit is a separate pod, so the endpoint varies per cuid. + const executionEndpoint = env.EXECUTION_ENDPOINT_TEMPLATE + ? env.EXECUTION_ENDPOINT_TEMPLATE.replace("{cuid}", String(computingUnitId)) + : getServiceEndpoints().executionEndpoint; + + const url = `${executionEndpoint}/api/execution/${workflowId}/${computingUnitId}/run`; + + const timeoutSeconds = params.executionTimeoutMs + ? Math.ceil(params.executionTimeoutMs / 1000) + : Math.ceil(DEFAULT_AGENT_SETTINGS.executionTimeoutMs / 1000); + + const request = { + executionName: "agent-execution", + logicalPlan: { + operators: logicalPlan.operators, + links: logicalPlan.links, + opsToViewResult: logicalPlan.opsToViewResult || [], + opsToReuseResult: [], + }, + targetOperatorIds: logicalPlan.opsToViewResult || [], + timeoutSeconds, + maxOperatorResultCharLimit: params.maxOperatorResultCharLimit ?? DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit, + maxOperatorResultCellCharLimit: + params.maxOperatorResultCellCharLimit ?? DEFAULT_AGENT_SETTINGS.maxOperatorResultCellCharLimit, + }; + + log.debug( + { + url, + maxOperatorResultCharLimit: request.maxOperatorResultCharLimit, + maxOperatorResultCellCharLimit: request.maxOperatorResultCellCharLimit, + }, + "executing workflow" + ); + + try { + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${params.userToken}`, + }, + body: JSON.stringify(request), + signal: options.abortSignal, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`Execution request failed: ${response.status} ${response.statusText} - ${errorText}`); + } + + return (await response.json()) as SyncExecutionResult; + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + throw error; + } + log.error({ err: error }, "execution failed"); + return { + success: false, + state: "Error", + operators: {}, + errors: [error instanceof Error ? error.message : "Unknown error"], + }; + } +} diff --git a/agent-service/src/api/index.ts b/agent-service/src/api/index.ts index eca292d7ffe..2346bb053f5 100644 --- a/agent-service/src/api/index.ts +++ b/agent-service/src/api/index.ts @@ -17,8 +17,7 @@ * under the License. */ -export * from "./backend-api"; -export * from "./execution-api"; -export * from "./workflow-api"; -export * from "./auth-api"; -export * from "./compile-api"; +export * from "./operator-metadata-client"; +export * from "./compile-client"; +export * from "./workflow-client"; +export * from "./execution-client"; diff --git a/agent-service/src/api/operator-metadata-client.test.ts b/agent-service/src/api/operator-metadata-client.test.ts new file mode 100644 index 00000000000..79c77b93856 --- /dev/null +++ b/agent-service/src/api/operator-metadata-client.test.ts @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test, afterEach } from "bun:test"; +import { fetchOperatorMetadata } from "./operator-metadata-client"; + +const realFetch = globalThis.fetch; +let lastUrl: string | undefined; + +function mockFetch(responder: () => Response): void { + globalThis.fetch = (async (input: any) => { + lastUrl = String(input); + return responder(); + }) as unknown as typeof fetch; +} + +afterEach(() => { + globalThis.fetch = realFetch; + lastUrl = undefined; +}); + +describe("fetchOperatorMetadata", () => { + test("fetches operator metadata from the Dashboard Service endpoint", async () => { + const payload = { operators: [], groups: [] }; + mockFetch(() => new Response(JSON.stringify(payload), { status: 200 })); + + const result = await fetchOperatorMetadata(); + + expect(result).toEqual(payload); + expect(lastUrl).toBe("http://localhost:8080/api/resources/operator-metadata"); + }); + + test("throws with status text when the response is not OK", async () => { + mockFetch(() => new Response("nope", { status: 503, statusText: "Service Unavailable" })); + + await expect(fetchOperatorMetadata()).rejects.toThrow("Failed to fetch operator metadata: 503"); + }); +}); diff --git a/agent-service/src/api/execution-api.ts b/agent-service/src/api/operator-metadata-client.ts similarity index 62% rename from agent-service/src/api/execution-api.ts rename to agent-service/src/api/operator-metadata-client.ts index 4692a61d754..f5ebb48cd48 100644 --- a/agent-service/src/api/execution-api.ts +++ b/agent-service/src/api/operator-metadata-client.ts @@ -17,22 +17,16 @@ * under the License. */ -export interface LogicalLink { - fromOpId: string; - fromPortId: { id: number; internal: boolean }; - toOpId: string; - toPortId: { id: number; internal: boolean }; -} +import { getServiceEndpoints } from "../config/endpoints"; +import type { OperatorMetadata } from "../types/metadata"; -interface LogicalOperator { - operatorID: string; - operatorType: string; - [key: string]: any; -} +export async function fetchOperatorMetadata(): Promise { + const url = `${getServiceEndpoints().apiEndpoint}/api/resources/operator-metadata`; + const response = await fetch(url); + + if (!response.ok) { + throw new Error(`Failed to fetch operator metadata: ${response.status} ${response.statusText}`); + } -export interface LogicalPlan { - operators: LogicalOperator[]; - links: LogicalLink[]; - opsToViewResult?: string[]; - opsToReuseResult?: string[]; + return (await response.json()) as OperatorMetadata; } diff --git a/agent-service/src/api/workflow-client.test.ts b/agent-service/src/api/workflow-client.test.ts new file mode 100644 index 00000000000..c71f8da1027 --- /dev/null +++ b/agent-service/src/api/workflow-client.test.ts @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test, afterEach } from "bun:test"; +import { persistWorkflow, retrieveWorkflow } from "./workflow-client"; +import { ExecutionMode, type WorkflowContent } from "../types/workflow"; + +const realFetch = globalThis.fetch; + +interface CapturedRequest { + url: string; + init: any; +} + +let captured: CapturedRequest | undefined; + +function mockFetch(responder: (req: CapturedRequest) => Response): void { + globalThis.fetch = (async (input: any, init: any) => { + captured = { url: String(input), init }; + return responder(captured); + }) as unknown as typeof fetch; +} + +afterEach(() => { + globalThis.fetch = realFetch; + captured = undefined; +}); + +const CONTENT: WorkflowContent = { + operators: [], + operatorPositions: {}, + links: [], + commentBoxes: [], + settings: { dataTransferBatchSize: 400, executionMode: ExecutionMode.PIPELINED }, +}; + +describe("retrieveWorkflow", () => { + test("GETs the workflow with a bearer token and parses the stringified content", async () => { + mockFetch( + () => new Response(JSON.stringify({ wid: 5, name: "W", content: JSON.stringify(CONTENT) }), { status: 200 }) + ); + + const workflow = await retrieveWorkflow("tok", 5); + + expect(captured?.url).toBe("http://localhost:8080/api/workflow/5"); + expect(captured?.init.method).toBe("GET"); + expect(captured?.init.headers.Authorization).toBe("Bearer tok"); + expect(workflow.content).toEqual(CONTENT); + }); + + test("throws when the response is not OK", async () => { + mockFetch(() => new Response("missing", { status: 404, statusText: "Not Found" })); + + await expect(retrieveWorkflow("tok", 5)).rejects.toThrow("Failed to retrieve workflow: 404"); + }); +}); + +describe("persistWorkflow", () => { + test("POSTs the workflow with a stringified content payload", async () => { + mockFetch( + () => new Response(JSON.stringify({ wid: 5, name: "W", content: JSON.stringify(CONTENT) }), { status: 200 }) + ); + + const workflow = await persistWorkflow("tok", 5, "W", CONTENT, "desc"); + + expect(captured?.url).toBe("http://localhost:8080/api/workflow/persist"); + expect(captured?.init.method).toBe("POST"); + expect(captured?.init.headers.Authorization).toBe("Bearer tok"); + + const body = JSON.parse(captured!.init.body); + expect(body.wid).toBe(5); + expect(body.name).toBe("W"); + expect(body.description).toBe("desc"); + expect(body.isPublic).toBe(false); + expect(typeof body.content).toBe("string"); + expect(JSON.parse(body.content)).toEqual(CONTENT); + expect(workflow.content).toEqual(CONTENT); + }); + + test("throws when persistence fails", async () => { + mockFetch(() => new Response("bad", { status: 400, statusText: "Bad Request" })); + + await expect(persistWorkflow("tok", 5, "W", CONTENT)).rejects.toThrow("Failed to persist workflow: 400"); + }); +}); diff --git a/agent-service/src/api/workflow-api.ts b/agent-service/src/api/workflow-client.ts similarity index 82% rename from agent-service/src/api/workflow-api.ts rename to agent-service/src/api/workflow-client.ts index 7a96f979a1c..1e4bf932265 100644 --- a/agent-service/src/api/workflow-api.ts +++ b/agent-service/src/api/workflow-client.ts @@ -17,27 +17,10 @@ * under the License. */ -import { getBackendConfig } from "./backend-api"; -import { createAuthHeaders } from "./auth-api"; +import { getServiceEndpoints } from "../config/endpoints"; +import { createAuthHeaders } from "../auth/jwt"; import type { WorkflowContent } from "../types/workflow"; - -export interface Workflow { - wid: number; - name: string; - description?: string; - content: WorkflowContent; - creationTime?: number; - lastModifiedTime?: number; - isPublished?: boolean; -} - -interface WorkflowPersistRequest { - wid?: number; - name: string; - description?: string; - content: string; - isPublic?: boolean; -} +import type { Workflow, WorkflowPersistRequest } from "../types/api"; const WORKFLOW_BASE_URL = "workflow"; @@ -48,7 +31,7 @@ export async function persistWorkflow( content: WorkflowContent, description?: string ): Promise { - const config = getBackendConfig(); + const config = getServiceEndpoints(); const url = `${config.apiEndpoint}/api/${WORKFLOW_BASE_URL}/persist`; const response = await fetch(url, { @@ -76,7 +59,7 @@ export async function persistWorkflow( } export async function retrieveWorkflow(token: string, wid: number): Promise { - const config = getBackendConfig(); + const config = getServiceEndpoints(); const url = `${config.apiEndpoint}/api/${WORKFLOW_BASE_URL}/${wid}`; const response = await fetch(url, { diff --git a/agent-service/src/auth/jwt.test.ts b/agent-service/src/auth/jwt.test.ts new file mode 100644 index 00000000000..c5da9b0ef01 --- /dev/null +++ b/agent-service/src/auth/jwt.test.ts @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test } from "bun:test"; +import { extractUserFromToken, validateToken, createAuthHeaders } from "./jwt"; + +function makeToken(payload: Record): string { + const encode = (o: Record) => Buffer.from(JSON.stringify(o)).toString("base64"); + return `${encode({ alg: "none", typ: "JWT" })}.${encode(payload)}.signature`; +} + +const nowSeconds = () => Math.floor(Date.now() / 1000); + +describe("extractUserFromToken", () => { + test("maps JWT claims onto a UserInfo", () => { + const token = makeToken({ userId: 7, sub: "alice", email: "alice@example.com", role: "ADMIN" }); + expect(extractUserFromToken(token)).toEqual({ uid: 7, name: "alice", email: "alice@example.com", role: "ADMIN" }); + }); + + test("defaults missing email and role", () => { + const token = makeToken({ userId: 1, sub: "bob" }); + expect(extractUserFromToken(token)).toEqual({ uid: 1, name: "bob", email: "", role: "REGULAR" }); + }); + + test("throws on a malformed token", () => { + expect(() => extractUserFromToken("not-a-jwt")).toThrow("Failed to decode JWT"); + }); +}); + +describe("validateToken", () => { + test("accepts a token expiring in the future", () => { + expect(validateToken(makeToken({ sub: "a", exp: nowSeconds() + 3600 }))).toBe(true); + }); + + test("rejects an expired token", () => { + expect(validateToken(makeToken({ sub: "a", exp: nowSeconds() - 3600 }))).toBe(false); + }); + + test("treats a token without exp as valid", () => { + expect(validateToken(makeToken({ sub: "a" }))).toBe(true); + }); + + test("rejects a malformed token", () => { + expect(validateToken("garbage")).toBe(false); + }); +}); + +describe("createAuthHeaders", () => { + test("builds bearer auth headers", () => { + expect(createAuthHeaders("tok")).toEqual({ Authorization: "Bearer tok", "Content-Type": "application/json" }); + }); +}); diff --git a/agent-service/src/api/auth-api.ts b/agent-service/src/auth/jwt.ts similarity index 100% rename from agent-service/src/api/auth-api.ts rename to agent-service/src/auth/jwt.ts diff --git a/agent-service/src/config/endpoints.ts b/agent-service/src/config/endpoints.ts new file mode 100644 index 00000000000..cab8c7ff20c --- /dev/null +++ b/agent-service/src/config/endpoints.ts @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { env } from "./env"; + +/** Base URLs of the backend services this agent service talks to. */ +export interface ServiceEndpoints { + apiEndpoint: string; + modelsEndpoint: string; + compileEndpoint: string; + executionEndpoint: string; +} + +const endpoints: ServiceEndpoints = { + apiEndpoint: env.TEXERA_DASHBOARD_SERVICE_ENDPOINT, + modelsEndpoint: env.LLM_ENDPOINT, + compileEndpoint: env.WORKFLOW_COMPILING_SERVICE_ENDPOINT, + executionEndpoint: env.WORKFLOW_EXECUTION_SERVICE_ENDPOINT, +}; + +export function getServiceEndpoints(): ServiceEndpoints { + return { ...endpoints }; +} diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index 0da3f693797..76c9358f1b6 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -23,8 +23,8 @@ import { createOpenAI } from "@ai-sdk/openai"; import { TexeraAgent } from "./agent/texera-agent"; import { getVisibleResultHeaders } from "./agent/tools/tools-utility"; import { getBackendConfig } from "./api/backend-api"; -import { extractBearerToken, extractUserFromToken, validateToken } from "./api/auth-api"; -import { retrieveWorkflow } from "./api/workflow-api"; +import { extractBearerToken, extractUserFromToken, validateToken } from "./auth/jwt"; +import { retrieveWorkflow } from "./api/workflow-client"; import { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata"; import { env } from "./config/env"; import { createLogger } from "./logger"; diff --git a/agent-service/src/types/agent.ts b/agent-service/src/types/agent.ts index 694b51785fd..6abb788cd5f 100644 --- a/agent-service/src/types/agent.ts +++ b/agent-service/src/types/agent.ts @@ -162,3 +162,13 @@ export interface UpdateAgentSettingsRequest { maxSteps?: number; allowedOperatorTypes?: string[]; } + +/** Parameters for a single workflow-execution request. */ +export interface ExecutionRequestParams { + userToken: string; + workflowId: number; + computingUnitId?: number; + maxOperatorResultCharLimit?: number; + maxOperatorResultCellCharLimit?: number; + executionTimeoutMs?: number; +} diff --git a/agent-service/src/types/api.ts b/agent-service/src/types/api.ts new file mode 100644 index 00000000000..ff10eb1ca87 --- /dev/null +++ b/agent-service/src/types/api.ts @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Wire DTOs: request/response bodies exchanged with backend services and the +// WebSocket frames this service sends to its own clients. Distinct from domain +// types (workflow.ts, execution.ts, agent.ts) which model in-memory state. + +import type { WorkflowContent, OperatorPortSchemaMap } from "./workflow"; +import type { ReActStep } from "./agent"; + +// --- Dashboard Service: workflow persistence --- + +export interface Workflow { + wid: number; + name: string; + description?: string; + content: WorkflowContent; + creationTime?: number; + lastModifiedTime?: number; + isPublished?: boolean; +} + +export interface WorkflowPersistRequest { + wid?: number; + name: string; + description?: string; + content: string; + isPublic?: boolean; +} + +// --- Workflow Compiling Service --- + +export interface WorkflowFatalError { + message: string; + details: string; + operatorId: string; + workerId: string; + type: { name: string }; + timestamp: { nanos: number; seconds: number }; +} + +export interface WorkflowCompilationResponse { + physicalPlan?: any; + operatorOutputSchemas: Record; + operatorErrors: Record; +} + +// --- This service's WebSocket protocol (/agents/:id/react) --- + +export interface WsMessage { + type: "message" | "stop"; + content?: string; + messageSource?: "chat" | "feedback"; +} + +export interface OperatorResultSummaryWs { + state: string; + inputTuples: number; + outputTuples: number; + inputPortShapes?: { portIndex: number; rows: number; columns: number }[]; + outputColumns?: number; + error?: string; + warnings?: string[]; + consoleLogCount?: number; + totalRowCount?: number; + sampleRecords?: Record[]; + resultStatistics?: Record; +} + +export interface WsOutgoingMessage { + type: "step" | "state" | "error" | "complete" | "init" | "headChange"; + step?: ReActStep; + state?: string; + error?: string; + steps?: ReActStep[]; + headId?: string; + operatorResults?: Record; + workflowContent?: any; +} diff --git a/agent-service/src/types/index.ts b/agent-service/src/types/index.ts index c6d7291e51d..c4ed59255b2 100644 --- a/agent-service/src/types/index.ts +++ b/agent-service/src/types/index.ts @@ -19,4 +19,6 @@ export * from "./workflow"; export * from "./execution"; +export * from "./metadata"; export * from "./agent"; +export * from "./api"; diff --git a/agent-service/src/types/metadata.ts b/agent-service/src/types/metadata.ts new file mode 100644 index 00000000000..18ae452a51c --- /dev/null +++ b/agent-service/src/types/metadata.ts @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Operator metadata shapes served by the Dashboard Service +// (`/api/resources/operator-metadata`) and the compact variants the agent +// derives from them for prompts and validation. + +export interface InputPortInfo { + displayName?: string; + disallowMultiLinks?: boolean; + dependencies?: { id: number; internal: boolean }[]; +} + +export interface OutputPortInfo { + displayName?: string; +} + +export interface OperatorAdditionalMetadata { + userFriendlyName: string; + operatorGroupName: string; + operatorDescription?: string; + inputPorts: InputPortInfo[]; + outputPorts: OutputPortInfo[]; + dynamicInputPorts?: boolean; + dynamicOutputPorts?: boolean; + supportReconfiguration?: boolean; + allowPortCustomization?: boolean; +} + +export interface OperatorSchema { + operatorType: string; + jsonSchema: any; + additionalMetadata: OperatorAdditionalMetadata; + operatorVersion: string; +} + +export interface GroupInfo { + groupName: string; + children?: GroupInfo[] | null; +} + +export interface OperatorMetadata { + operators: OperatorSchema[]; + groups: GroupInfo[]; +} + +/** Full per-operator schema slice surfaced to debugging/inspection callers. */ +export interface OperatorSchemaInfo { + properties: any; + required: any; + definitions: any; +} + +/** Reduced operator schema (refs inlined, noise stripped) used in prompts and errors. */ +export interface CompactOperatorSchema { + properties: Record; + required: string[]; +} diff --git a/agent-service/src/types/workflow.ts b/agent-service/src/types/workflow.ts index 52c6493cf5f..2622e7f8489 100644 --- a/agent-service/src/types/workflow.ts +++ b/agent-service/src/types/workflow.ts @@ -104,8 +104,16 @@ export interface CommentBox { readonly height: number; } +export enum ExecutionMode { + PIPELINED = "PIPELINED", + MATERIALIZED = "MATERIALIZED", +} + export interface WorkflowSettings { readonly dataTransferBatchSize: number; + // Optional here; the follow-up server/state PR makes it required and sets it + // in DEFAULT_WORKFLOW_SETTINGS alongside the broader workflow.ts reshaping. + readonly executionMode?: ExecutionMode; } export interface WorkflowContent {