Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions agent-service/src/agent/texera-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import {
type ExecutionConfig,
} from "./tools/workflow-execution-tools";
import { assembleContext } from "./util/context-utils";
import { compileWorkflowAsync, type WorkflowCompilationResponse } from "../api/compile-api";
import { compileWorkflowAsync } from "../api/compile-client";
import type { WorkflowCompilationResponse } from "../types/api";
import { createLogger } from "../logger";
import type { Logger } from "pino";

Expand Down Expand Up @@ -420,7 +421,7 @@ export class TexeraAgent {
}

try {
const { retrieveWorkflow } = await import("../api/workflow-api");
const { retrieveWorkflow } = await import("../api/workflow-client");
const workflow = await retrieveWorkflow(this.delegateConfig.userToken, this.delegateConfig.workflowId);
this.workflowState.setWorkflowContent(workflow.content);
this.log.debug({ workflowId: this.delegateConfig.workflowId }, "refreshed workflow from backend");
Expand Down Expand Up @@ -464,7 +465,7 @@ export class TexeraAgent {
}

try {
const { persistWorkflow } = await import("../api/workflow-api");
const { persistWorkflow } = await import("../api/workflow-client");
const workflowContent = this.workflowState.getWorkflowContent();
await persistWorkflow(
this.delegateConfig.userToken,
Expand Down
2 changes: 1 addition & 1 deletion agent-service/src/agent/tools/workflow-execution-tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { createErrorResult, formatExecuteOperatorResult, getVisibleResultHeaders
import type { WorkflowState } from "../workflow-state";
import { getBackendConfig } from "../../api/backend-api";
import { env } from "../../config/env";
import type { LogicalPlan, LogicalLink } from "../../api/execution-api";
import type { LogicalPlan, LogicalLink } from "../../types/workflow";
import type { OperatorInfo, SyncExecutionResult } from "../../types/execution";
import { WorkflowSystemMetadata } from "../util/workflow-system-metadata";
import { DEFAULT_AGENT_SETTINGS } from "../../types/agent";
Expand Down
2 changes: 1 addition & 1 deletion agent-service/src/agent/util/context-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import type { ModelMessage } from "ai";
import type { WorkflowState } from "../workflow-state";
import type { OperatorPredicate, OperatorPortSchemaMap, PortSchema } from "../../types/workflow";
import type { ReActStep } from "../../types/agent";
import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../api/compile-api";
import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../types/api";
import { extractOperatorInputPortSchemaMap } from "./workflow-utils";
import { createLogger } from "../../logger";

Expand Down
3 changes: 2 additions & 1 deletion agent-service/src/agent/util/workflow-system-metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
*/

import Ajv from "ajv";
import { fetchOperatorMetadata, type OperatorSchema, type OperatorMetadata } from "../../api/backend-api";
import { fetchOperatorMetadata } from "../../api/operator-metadata-client";
import type { OperatorSchema, OperatorMetadata } from "../../types/metadata";
import type { ValidationError, Validation } from "../../types/workflow";
import { createLogger } from "../../logger";

Expand Down
50 changes: 0 additions & 50 deletions agent-service/src/api/backend-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,53 +36,3 @@ const currentConfig: BackendConfig = {
export function getBackendConfig(): BackendConfig {
return { ...currentConfig };
}

export interface InputPortInfo {
displayName?: string;
disallowMultiLinks?: boolean;
dependencies?: { id: number; internal: boolean }[];
}

export interface OutputPortInfo {
displayName?: string;
}

interface OperatorAdditionalMetadata {
userFriendlyName: string;
operatorGroupName: string;
operatorDescription?: string;
inputPorts: InputPortInfo[];
outputPorts: OutputPortInfo[];
dynamicInputPorts?: boolean;
dynamicOutputPorts?: boolean;
supportReconfiguration?: boolean;
allowPortCustomization?: boolean;
}

export interface OperatorSchema {
operatorType: string;
jsonSchema: any;
additionalMetadata: OperatorAdditionalMetadata;
operatorVersion: string;
}

interface GroupInfo {
groupName: string;
children?: GroupInfo[] | null;
}

export interface OperatorMetadata {
operators: OperatorSchema[];
groups: GroupInfo[];
}

export async function fetchOperatorMetadata(): Promise<OperatorMetadata> {
const url = `${currentConfig.apiEndpoint}/api/resources/operator-metadata`;
const response = await fetch(url);

if (!response.ok) {
throw new Error(`Failed to fetch operator metadata: ${response.status} ${response.statusText}`);
}

return (await response.json()) as OperatorMetadata;
}
80 changes: 80 additions & 0 deletions agent-service/src/api/compile-client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { describe, expect, test, afterEach } from "bun:test";
import { compileWorkflowAsync } from "./compile-client";
import type { LogicalPlan } from "../types/workflow";

const realFetch = globalThis.fetch;

interface CapturedRequest {
url: string;
init: any;
}

let captured: CapturedRequest | undefined;

function mockFetch(responder: (req: CapturedRequest) => Response | Promise<Response>): void {
globalThis.fetch = (async (input: any, init: any) => {
captured = { url: String(input), init };
return responder(captured);
}) as unknown as typeof fetch;
}

afterEach(() => {
globalThis.fetch = realFetch;
captured = undefined;
});

const PLAN: LogicalPlan = {
operators: [{ operatorID: "op1", operatorType: "CSVFileScan" }],
links: [],
};

describe("compileWorkflowAsync", () => {
test("POSTs the plan to the compiling service and returns the parsed response", async () => {
const payload = { operatorOutputSchemas: {}, operatorErrors: {} };
mockFetch(() => new Response(JSON.stringify(payload), { status: 200 }));

const result = await compileWorkflowAsync(PLAN);

expect(result).toEqual(payload);
expect(captured?.url).toBe("http://localhost:9090/api/compile");
expect(captured?.init.method).toBe("POST");
const body = JSON.parse(captured!.init.body);
expect(body.operators).toEqual(PLAN.operators);
expect(body.links).toEqual([]);
expect(body.opsToReuseResult).toEqual([]);
expect(body.opsToViewResult).toEqual([]);
});

test("returns null when the compiling service responds non-OK", async () => {
mockFetch(() => new Response("compile error", { status: 400, statusText: "Bad Request" }));

expect(await compileWorkflowAsync(PLAN)).toBeNull();
});

test("returns null on a network failure", async () => {
mockFetch(() => {
throw new Error("connection refused");
});

expect(await compileWorkflowAsync(PLAN)).toBeNull();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,15 @@
* under the License.
*/

import { getBackendConfig } from "./backend-api";
import type { LogicalPlan, OperatorPortSchemaMap } from "../types/workflow";
import { getServiceEndpoints } from "../config/endpoints";
import type { LogicalPlan } from "../types/workflow";
import type { WorkflowCompilationResponse } from "../types/api";
import { createLogger } from "../logger";

const log = createLogger("CompileAPI");

export interface SchemaAttribute {
attributeName: string;
attributeType: "string" | "integer" | "double" | "boolean" | "long" | "timestamp" | "binary";
}

export type PortSchema = ReadonlyArray<SchemaAttribute>;

export interface WorkflowFatalError {
type: string;
message: string;
operatorId?: string;
}

export interface WorkflowCompilationResponse {
physicalPlan?: any;
operatorOutputSchemas: Record<string, OperatorPortSchemaMap>;
operatorErrors: Record<string, WorkflowFatalError>;
}
const log = createLogger("CompileClient");

export async function compileWorkflowAsync(logicalPlan: LogicalPlan): Promise<WorkflowCompilationResponse | null> {
const config = getBackendConfig();
const url = `${config.compileEndpoint}/api/compile`;
const url = `${getServiceEndpoints().compileEndpoint}/api/compile`;

const body = {
operators: logicalPlan.operators,
Expand Down
143 changes: 143 additions & 0 deletions agent-service/src/api/execution-client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { describe, expect, test, afterEach } from "bun:test";
import { executeWorkflowHttp } from "./execution-client";
import { DEFAULT_AGENT_SETTINGS, type ExecutionRequestParams } from "../types/agent";
import type { LogicalPlan } from "../types/workflow";
import { env } from "../config/env";

const realFetch = globalThis.fetch;

interface CapturedRequest {
url: string;
init: any;
}

let captured: CapturedRequest | undefined;

function mockFetch(responder: (req: CapturedRequest) => Response | Promise<Response>): void {
globalThis.fetch = (async (input: any, init: any) => {
captured = { url: String(input), init };
return responder(captured);
}) as unknown as typeof fetch;
}

function okResponse(body: unknown): Response {
return new Response(JSON.stringify(body), { status: 200, headers: { "Content-Type": "application/json" } });
}

afterEach(() => {
globalThis.fetch = realFetch;
captured = undefined;
});

const PLAN: LogicalPlan = {
operators: [{ operatorID: "op1", operatorType: "CSVFileScan" }],
links: [],
opsToViewResult: ["op1"],
opsToReuseResult: [],
};

const PARAMS: ExecutionRequestParams = {
userToken: "test-token",
workflowId: 42,
computingUnitId: 7,
executionTimeoutMs: 60_000,
maxOperatorResultCharLimit: 1000,
maxOperatorResultCellCharLimit: 500,
};

describe("executeWorkflowHttp", () => {
test("posts to the per-CU execution endpoint with auth and a complete request body", async () => {
mockFetch(() => okResponse({ success: true, state: "Completed", operators: {} }));

const result = await executeWorkflowHttp(PARAMS, PLAN);

expect(result).toEqual({ success: true, state: "Completed", operators: {} });
expect(captured?.url).toBe("http://localhost:8085/api/execution/42/7/run");
expect(captured?.init.method).toBe("POST");
expect(captured?.init.headers.Authorization).toBe("Bearer test-token");

const body = JSON.parse(captured!.init.body);
expect(body.executionName).toBe("agent-execution");
expect(body.logicalPlan.operators).toEqual(PLAN.operators);
expect(body.logicalPlan.opsToReuseResult).toEqual([]);
expect(body.targetOperatorIds).toEqual(["op1"]);
expect(body.timeoutSeconds).toBe(60);
expect(body.maxOperatorResultCharLimit).toBe(1000);
expect(body.maxOperatorResultCellCharLimit).toBe(500);
});

test("defaults computingUnitId to 0 and falls back to default limits/timeout", async () => {
mockFetch(() => okResponse({ success: true, state: "Completed", operators: {} }));

await executeWorkflowHttp({ userToken: "t", workflowId: 42 }, PLAN);

expect(captured?.url).toBe("http://localhost:8085/api/execution/42/0/run");
const body = JSON.parse(captured!.init.body);
expect(body.timeoutSeconds).toBe(Math.ceil(DEFAULT_AGENT_SETTINGS.executionTimeoutMs / 1000));
expect(body.maxOperatorResultCharLimit).toBe(DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit);
expect(body.maxOperatorResultCellCharLimit).toBe(DEFAULT_AGENT_SETTINGS.maxOperatorResultCellCharLimit);
});

test("returns an error-state result when the backend responds non-OK", async () => {
mockFetch(() => new Response("boom", { status: 500, statusText: "Server Error" }));

const result = await executeWorkflowHttp(PARAMS, PLAN);

expect(result.success).toBe(false);
expect(result.state).toBe("Error");
expect(result.operators).toEqual({});
expect(result.errors?.[0]).toContain("500");
expect(result.errors?.[0]).toContain("boom");
});

test("returns an error-state result on a network failure", async () => {
mockFetch(() => {
throw new Error("network down");
});

const result = await executeWorkflowHttp(PARAMS, PLAN);

expect(result).toEqual({ success: false, state: "Error", operators: {}, errors: ["network down"] });
});

test("re-throws AbortError so callers can detect cancellation", async () => {
mockFetch(() => {
const err = new Error("aborted");
err.name = "AbortError";
throw err;
});

await expect(executeWorkflowHttp(PARAMS, PLAN)).rejects.toThrow("aborted");
});

test("uses EXECUTION_ENDPOINT_TEMPLATE with {cuid} substituted when configured", async () => {
const previous = env.EXECUTION_ENDPOINT_TEMPLATE;
(env as any).EXECUTION_ENDPOINT_TEMPLATE = "http://cu-{cuid}.svc:8085";
try {
mockFetch(() => okResponse({ success: true, state: "Completed", operators: {} }));
await executeWorkflowHttp(PARAMS, PLAN);
expect(captured?.url).toBe("http://cu-7.svc:8085/api/execution/42/7/run");
} finally {
(env as any).EXECUTION_ENDPOINT_TEMPLATE = previous;
}
});
});
Loading
Loading