From ca69f8e0020bbb8f79edc92f04b948104ea42ea0 Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Wed, 29 Apr 2026 15:13:38 +0000 Subject: [PATCH] feat: decode inline Arrow IPC + warehouse-compat fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Serverless warehouses return ARROW_STREAM + INLINE results as base64 Arrow IPC in result.attachment rather than result.data_array. The previous code path discarded inline data for any ARROW_STREAM response (designed for EXTERNAL_LINKS), so these warehouses silently returned empty results. This commit makes the analytics plugin work across classic and serverless warehouses by handling both dispositions for ARROW_STREAM, decoding inline Arrow IPC attachments server-side, and falling back to JSON_ARRAY when a warehouse rejects ARROW_STREAM + INLINE. Changes - Inline Arrow IPC decoding (new arrow-schema.ts) via apache-arrow's tableFromIPC, producing the same row-object shape as JSON_ARRAY regardless of warehouse backend. apache-arrow@21.1.0 added as a server dep. - Format fallback: ARROW_STREAM + INLINE requests automatically fall back to JSON_ARRAY if a classic warehouse rejects them. Explicit format requests are respected without fallback. - Zod-validated SSE wire protocol for /api/analytics/query (shared schema between server and client; malformed payloads surface a clear error instead of silent undefined). - Default remains JSON_ARRAY for compatibility. Stack: layer 3 of 3 carved from #256. - #327 — coverage backfill (layer 1) - #328 — AnalyticsFormat rename to API enum names (layer 2) - (this PR) — the actual fix Fixes #242 Co-authored-by: Isaac --- docs/docs/api/appkit/Class.ExecutionError.md | 30 +- packages/appkit-ui/src/js/sse/connect-sse.ts | 5 +- .../src/react/charts/__tests__/types.test.ts | 2 +- packages/appkit-ui/src/react/charts/types.ts | 6 +- .../__tests__/use-analytics-query.test.ts | 143 +++++ .../hooks/__tests__/use-chart-data.test.ts | 24 +- packages/appkit-ui/src/react/hooks/types.ts | 10 +- .../src/react/hooks/use-analytics-query.ts | 98 +++- .../src/react/hooks/use-chart-data.ts | 10 +- packages/appkit/package.json | 6 +- .../connectors/sql-warehouse/arrow-schema.ts | 441 +++++++++++++++ .../src/connectors/sql-warehouse/client.ts | 114 +++- .../sql-warehouse/tests/arrow-schema.test.ts | 514 ++++++++++++++++++ .../sql-warehouse/tests/client.test.ts | 382 +++++++++++++ packages/appkit/src/errors/execution.ts | 32 +- .../appkit/src/plugins/analytics/analytics.ts | 197 +++++-- .../plugins/analytics/tests/analytics.test.ts | 449 +++++++++++++++ packages/appkit/src/stream/defaults.ts | 8 +- .../src/type-generator/query-registry.ts | 97 +++- .../tests/query-registry.test.ts | 137 ++++- packages/appkit/src/type-generator/types.ts | 2 + packages/shared/package.json | 3 +- packages/shared/src/index.ts | 1 + packages/shared/src/sse/analytics.test.ts | 87 +++ packages/shared/src/sse/analytics.ts | 95 ++++ pnpm-lock.yaml | 53 +- 26 files changed, 2823 insertions(+), 123 deletions(-) create mode 100644 packages/appkit-ui/src/react/hooks/__tests__/use-analytics-query.test.ts create mode 100644 packages/appkit/src/connectors/sql-warehouse/arrow-schema.ts create mode 100644 packages/appkit/src/connectors/sql-warehouse/tests/arrow-schema.test.ts create mode 100644 packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts create mode 100644 packages/shared/src/sse/analytics.test.ts create mode 100644 packages/shared/src/sse/analytics.ts diff --git a/docs/docs/api/appkit/Class.ExecutionError.md b/docs/docs/api/appkit/Class.ExecutionError.md index 75886c4dc..eacfdc23d 100644 --- a/docs/docs/api/appkit/Class.ExecutionError.md +++ b/docs/docs/api/appkit/Class.ExecutionError.md @@ -22,6 +22,7 @@ throw new ExecutionError("Statement was canceled"); new ExecutionError(message: string, options?: { cause?: Error; context?: Record; + errorCode?: string; }): ExecutionError; ``` @@ -30,15 +31,16 @@ new ExecutionError(message: string, options?: { | Parameter | Type | | ------ | ------ | | `message` | `string` | -| `options?` | \{ `cause?`: `Error`; `context?`: `Record`\<`string`, `unknown`\>; \} | +| `options?` | \{ `cause?`: `Error`; `context?`: `Record`\<`string`, `unknown`\>; `errorCode?`: `string`; \} | | `options.cause?` | `Error` | | `options.context?` | `Record`\<`string`, `unknown`\> | +| `options.errorCode?` | `string` | #### Returns `ExecutionError` -#### Inherited from +#### Overrides [`AppKitError`](Class.AppKitError.md).[`constructor`](Class.AppKitError.md#constructor) @@ -86,6 +88,19 @@ Additional context for the error *** +### errorCode? + +```ts +readonly optional errorCode: string; +``` + +Structured error code from the upstream source (typically the warehouse's +`error_code` for statement-level failures, or the SDK's `ApiError.errorCode` +for HTTP failures). Preserved through wrapping so callers can branch on a +stable identifier without substring-matching the message. + +*** + ### isRetryable ```ts @@ -202,16 +217,17 @@ Create an execution error for closed/expired results ### statementFailed() ```ts -static statementFailed(errorMessage?: string): ExecutionError; +static statementFailed(errorMessage?: string, errorCode?: string): ExecutionError; ``` -Create an execution error for statement failure +Create an execution error for statement failure. #### Parameters -| Parameter | Type | -| ------ | ------ | -| `errorMessage?` | `string` | +| Parameter | Type | Description | +| ------ | ------ | ------ | +| `errorMessage?` | `string` | Human-readable error from the warehouse / SDK. | +| `errorCode?` | `string` | Structured code (e.g. "INVALID_PARAMETER_VALUE") to preserve through wrapping. Optional. | #### Returns diff --git a/packages/appkit-ui/src/js/sse/connect-sse.ts b/packages/appkit-ui/src/js/sse/connect-sse.ts index c4fd4500d..089ddf579 100644 --- a/packages/appkit-ui/src/js/sse/connect-sse.ts +++ b/packages/appkit-ui/src/js/sse/connect-sse.ts @@ -18,7 +18,10 @@ export async function connectSSE( lastEventId: initialLastEventId = null, retryDelay = 2000, maxRetries = 3, - maxBufferSize = 1024 * 1024, // 1MB + // 8 MiB — sized to receive inline Arrow IPC attachments from + // ARROW_STREAM analytics responses; matches the server's stream + // `maxEventSize`. Most events are well under 1 MiB in practice. + maxBufferSize = 8 * 1024 * 1024, timeout = 300000, // 5 minutes onError, } = options; diff --git a/packages/appkit-ui/src/react/charts/__tests__/types.test.ts b/packages/appkit-ui/src/react/charts/__tests__/types.test.ts index 13394dcf6..d6685ce01 100644 --- a/packages/appkit-ui/src/react/charts/__tests__/types.test.ts +++ b/packages/appkit-ui/src/react/charts/__tests__/types.test.ts @@ -93,7 +93,7 @@ describe("isQueryProps", () => { const props = { queryKey: "test_query", parameters: { limit: 100 }, - format: "json" as const, + format: "json_array" as const, }; expect(isQueryProps(props as any)).toBe(true); diff --git a/packages/appkit-ui/src/react/charts/types.ts b/packages/appkit-ui/src/react/charts/types.ts index 65804a741..ec8a15dc2 100644 --- a/packages/appkit-ui/src/react/charts/types.ts +++ b/packages/appkit-ui/src/react/charts/types.ts @@ -5,7 +5,7 @@ import type { Table } from "apache-arrow"; // ============================================================================ /** Supported data formats for analytics queries */ -export type DataFormat = "json" | "arrow" | "auto"; +export type DataFormat = "json_array" | "arrow_stream" | "auto"; /** Chart orientation */ export type Orientation = "vertical" | "horizontal"; @@ -77,8 +77,8 @@ export interface QueryProps extends ChartBaseProps { parameters?: Record; /** * Data format to use - * - "json": Use JSON format (smaller payloads, simpler) - * - "arrow": Use Arrow format (faster for large datasets) + * - "json_array": Use JSON format (smaller payloads, simpler) + * - "arrow_stream": Use Arrow format (faster for large datasets) * - "auto": Automatically select based on expected data size * @default "auto" */ diff --git a/packages/appkit-ui/src/react/hooks/__tests__/use-analytics-query.test.ts b/packages/appkit-ui/src/react/hooks/__tests__/use-analytics-query.test.ts new file mode 100644 index 000000000..65de7d10a --- /dev/null +++ b/packages/appkit-ui/src/react/hooks/__tests__/use-analytics-query.test.ts @@ -0,0 +1,143 @@ +import { renderHook, waitFor } from "@testing-library/react"; +import { beforeEach, describe, expect, test, vi } from "vitest"; + +// Capture the onMessage handler so tests can drive SSE messages directly. +let lastConnectArgs: any = null; +const mockProcessArrowBuffer = vi.fn(); +const mockFetchArrow = vi.fn(); + +vi.mock("@/js", () => ({ + connectSSE: vi.fn((args: any) => { + lastConnectArgs = args; + return () => {}; + }), + ArrowClient: { + fetchArrow: (...args: unknown[]) => mockFetchArrow(...args), + processArrowBuffer: (...args: unknown[]) => mockProcessArrowBuffer(...args), + }, +})); + +// useQueryHMR is a no-op shim for tests; mock to avoid HMR side effects. +vi.mock("../use-query-hmr", () => ({ + useQueryHMR: vi.fn(), +})); + +import { useAnalyticsQuery } from "../use-analytics-query"; + +describe("useAnalyticsQuery", () => { + beforeEach(() => { + vi.clearAllMocks(); + lastConnectArgs = null; + }); + + test("decodes arrow_inline base64 attachment via ArrowClient.processArrowBuffer", async () => { + const fakeTable = { numRows: 1, schema: { fields: [] } }; + mockProcessArrowBuffer.mockResolvedValueOnce(fakeTable); + + // 'AQID' decodes to bytes [1, 2, 3]. + const base64 = "AQID"; + + const { result } = renderHook(() => + useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }), + ); + + // Drive the SSE onMessage handler with an arrow_inline payload. + await lastConnectArgs.onMessage({ + data: JSON.stringify({ type: "arrow_inline", attachment: base64 }), + }); + + await waitFor(() => { + expect(result.current.data).toBe(fakeTable); + }); + + expect(mockProcessArrowBuffer).toHaveBeenCalledTimes(1); + const passedBuffer = mockProcessArrowBuffer.mock.calls[0][0] as Uint8Array; + expect(passedBuffer).toBeInstanceOf(Uint8Array); + expect(Array.from(passedBuffer)).toEqual([1, 2, 3]); + // Inline path must NOT trigger a network fetch. + expect(mockFetchArrow).not.toHaveBeenCalled(); + }); + + test("surfaces an error when arrow_inline decode fails", async () => { + mockProcessArrowBuffer.mockRejectedValueOnce(new Error("bad ipc")); + + const { result } = renderHook(() => + useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }), + ); + + await lastConnectArgs.onMessage({ + data: JSON.stringify({ type: "arrow_inline", attachment: "AQID" }), + }); + + await waitFor(() => { + expect(result.current.error).toBe( + "Unable to load data, please try again", + ); + }); + expect(result.current.loading).toBe(false); + }); + + test("rejects arrow_inline with missing/empty/non-string attachment without crashing atob", async () => { + const cases: Array = [undefined, null, "", 123, { foo: "bar" }]; + + for (const attachment of cases) { + mockProcessArrowBuffer.mockClear(); + const { result, unmount } = renderHook(() => + useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }), + ); + + await lastConnectArgs.onMessage({ + data: JSON.stringify({ type: "arrow_inline", attachment }), + }); + + await waitFor(() => { + expect(result.current.error).toBe( + "Unable to load data, please try again", + ); + }); + // Critically: must NOT call processArrowBuffer (or atob) on the bad input. + expect(mockProcessArrowBuffer).not.toHaveBeenCalled(); + + unmount(); + } + }); + + test("rejects oversized arrow_inline attachment without allocating a huge buffer", async () => { + // Base64 string that would decode to ~9 MiB (>8 MiB cap). The hook + // should reject before calling decodeBase64 / processArrowBuffer. + const oversized = "A".repeat(13 * 1024 * 1024); + + const { result } = renderHook(() => + useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }), + ); + + await lastConnectArgs.onMessage({ + data: JSON.stringify({ type: "arrow_inline", attachment: oversized }), + }); + + await waitFor(() => { + expect(result.current.error).toBe( + "Unable to load data, please try again", + ); + }); + expect(mockProcessArrowBuffer).not.toHaveBeenCalled(); + }); + + test("still handles type:result rows for JSON_ARRAY", async () => { + const { result } = renderHook(() => + useAnalyticsQuery("q", null, { format: "JSON_ARRAY" }), + ); + + await lastConnectArgs.onMessage({ + data: JSON.stringify({ + type: "result", + data: [{ id: 1 }, { id: 2 }], + }), + }); + + await waitFor(() => { + expect(result.current.data).toEqual([{ id: 1 }, { id: 2 }]); + }); + expect(mockProcessArrowBuffer).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts b/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts index a4d99a916..686aff317 100644 --- a/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts +++ b/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts @@ -72,7 +72,7 @@ describe("useChartData", () => { }); describe("format selection", () => { - test("uses JSON format when explicitly specified", () => { + test("uses JSON_ARRAY format when explicitly specified", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -82,7 +82,7 @@ describe("useChartData", () => { renderHook(() => useChartData({ queryKey: "test", - format: "json", + format: "json_array", }), ); @@ -93,7 +93,7 @@ describe("useChartData", () => { ); }); - test("uses ARROW format when explicitly specified", () => { + test("uses ARROW_STREAM format when explicitly specified", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -103,7 +103,7 @@ describe("useChartData", () => { renderHook(() => useChartData({ queryKey: "test", - format: "arrow", + format: "arrow_stream", }), ); @@ -114,7 +114,7 @@ describe("useChartData", () => { ); }); - test("auto-selects ARROW for large limit", () => { + test("auto-selects ARROW_STREAM for large limit", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -136,7 +136,7 @@ describe("useChartData", () => { ); }); - test("auto-selects ARROW for date range queries", () => { + test("auto-selects ARROW_STREAM for date range queries", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -205,7 +205,7 @@ describe("useChartData", () => { ); }); - test("auto-selects JSON by default when no heuristics match", () => { + test("auto-selects JSON_ARRAY by default when no heuristics match", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -227,7 +227,7 @@ describe("useChartData", () => { ); }); - test("defaults to auto format (JSON) when format is not specified", () => { + test("defaults to auto format (JSON_ARRAY) when format is not specified", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -353,7 +353,7 @@ describe("useChartData", () => { expect(result.current.isArrow).toBe(false); }); - test("isArrow reflects requested ARROW format when data is null", () => { + test("isArrow reflects requested ARROW_STREAM format when data is null", () => { mockUseAnalyticsQuery.mockReturnValue({ data: null, loading: true, @@ -361,13 +361,13 @@ describe("useChartData", () => { }); const { result } = renderHook(() => - useChartData({ queryKey: "test", format: "arrow" }), + useChartData({ queryKey: "test", format: "arrow_stream" }), ); expect(result.current.isArrow).toBe(true); }); - test("isArrow reflects requested JSON format when data is null", () => { + test("isArrow reflects requested JSON_ARRAY format when data is null", () => { mockUseAnalyticsQuery.mockReturnValue({ data: null, loading: true, @@ -375,7 +375,7 @@ describe("useChartData", () => { }); const { result } = renderHook(() => - useChartData({ queryKey: "test", format: "json" }), + useChartData({ queryKey: "test", format: "json_array" }), ); expect(result.current.isArrow).toBe(false); diff --git a/packages/appkit-ui/src/react/hooks/types.ts b/packages/appkit-ui/src/react/hooks/types.ts index e5e178c90..6ee08fe54 100644 --- a/packages/appkit-ui/src/react/hooks/types.ts +++ b/packages/appkit-ui/src/react/hooks/types.ts @@ -32,8 +32,10 @@ export interface TypedArrowTable< // ============================================================================ /** Options for configuring an analytics SSE query */ -export interface UseAnalyticsQueryOptions { - /** Response format - "JSON_ARRAY" returns typed arrays, "ARROW_STREAM" returns TypedArrowTable */ +export interface UseAnalyticsQueryOptions< + F extends AnalyticsFormat = "JSON_ARRAY", +> { + /** Response format - "JSON_ARRAY" (default) returns typed arrays, "ARROW_STREAM" uses Arrow (inline or external links) */ format?: F; /** Maximum size of serialized parameters in bytes */ @@ -120,7 +122,9 @@ export type InferResultByFormat< T, K, F extends AnalyticsFormat, -> = F extends "ARROW_STREAM" ? TypedArrowTable> : InferResult; +> = F extends "ARROW_STREAM" + ? TypedArrowTable> + : InferResult; /** * Infers parameters type from QueryRegistry[K]["parameters"] diff --git a/packages/appkit-ui/src/react/hooks/use-analytics-query.ts b/packages/appkit-ui/src/react/hooks/use-analytics-query.ts index 0bd0b2f02..5d18a2eed 100644 --- a/packages/appkit-ui/src/react/hooks/use-analytics-query.ts +++ b/packages/appkit-ui/src/react/hooks/use-analytics-query.ts @@ -1,4 +1,5 @@ import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { AnalyticsSseMessage } from "shared"; import { ArrowClient, connectSSE } from "@/js"; import type { AnalyticsFormat, @@ -22,6 +23,29 @@ function getArrowStreamUrl(id: string) { return `/api/analytics/arrow-result/${id}`; } +/** + * Client-side defensive cap on inline Arrow IPC attachments (8 MiB decoded). + * Mirrors the server's MAX_INLINE_ATTACHMENT_BYTES so a misconfigured proxy + * (or a future server bug) can't push us into allocating an unbounded + * Uint8Array and hanging the browser. + * + * REMOVE THIS GUARD if PR #320 (stash + serve via /arrow-result) lands — + * that proposal eliminates the arrow_inline SSE path entirely, so bulk + * bytes flow over HTTP where the browser handles backpressure natively + * and Content-Length is exposed up-front. + */ +const MAX_INLINE_ATTACHMENT_BYTES = 8 * 1024 * 1024; + +/** Decode a base64 string into a Uint8Array suitable for Arrow IPC parsing. */ +function decodeBase64(b64: string): Uint8Array { + const binary = atob(b64); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < binary.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + return bytes; +} + /** * Subscribe to an analytics query over SSE and returns its latest result. * Integration hook between client and analytics plugin. @@ -39,13 +63,13 @@ function getArrowStreamUrl(id: string) { * @param options - Analytics query settings including format * @returns Query result state with format-appropriate data type * - * @example JSON format (default) + * @example JSON_ARRAY format (default) * ```typescript * const { data } = useAnalyticsQuery("spend_data", params); * // data: Array<{ group_key: string; cost_usd: number; ... }> | null * ``` * - * @example Arrow format + * @example ARROW_STREAM format * ```typescript * const { data } = useAnalyticsQuery("spend_data", params, { format: "ARROW_STREAM" }); * // data: TypedArrowTable<{ group_key: string; cost_usd: number; ... }> | null @@ -120,20 +144,28 @@ export function useAnalyticsQuery< signal: abortController.signal, onMessage: async (message) => { try { - const parsed = JSON.parse(message.data); + const rawParsed = JSON.parse(message.data); + + // The error/code branch below predates the SSE wire schema and + // can fire for messages that don't match any AnalyticsSseMessage + // variant (e.g. server-side error events from executeStream). + // Try schema validation first; if it fails, fall through to the + // generic error/code handling below. + const validated = AnalyticsSseMessage.safeParse(rawParsed); + const msg = validated.success ? validated.data : null; // success - JSON format - if (parsed.type === "result") { + if (msg?.type === "result") { setLoading(false); - setData(parsed.data as ResultType); + setData(msg.data as ResultType); return; } - // success - Arrow format - if (parsed.type === "arrow") { + // success - Arrow format (external links: fetch from server) + if (msg?.type === "arrow") { try { const arrowData = await ArrowClient.fetchArrow( - getArrowStreamUrl(parsed.statement_id), + getArrowStreamUrl(msg.statement_id), ); const table = await ArrowClient.processArrowBuffer(arrowData); setLoading(false); @@ -151,6 +183,44 @@ export function useAnalyticsQuery< } } + // success - Arrow format (inline: decode base64 IPC payload locally) + if (msg?.type === "arrow_inline") { + // Schema already enforced non-empty string; just check size. + // base64 length L decodes to ~L*3/4 bytes; reject before + // allocating a multi-MiB Uint8Array. + const decodedSize = Math.ceil((msg.attachment.length * 3) / 4); + if (decodedSize > MAX_INLINE_ATTACHMENT_BYTES) { + console.error( + "[useAnalyticsQuery] arrow_inline attachment exceeds %d bytes (got %d)", + MAX_INLINE_ATTACHMENT_BYTES, + decodedSize, + ); + setLoading(false); + setError("Unable to load data, please try again"); + return; + } + try { + const buffer = decodeBase64(msg.attachment); + const table = await ArrowClient.processArrowBuffer(buffer); + setLoading(false); + setData(table as ResultType); + return; + } catch (error) { + console.error( + "[useAnalyticsQuery] Failed to decode inline Arrow data", + error, + ); + setLoading(false); + setError("Unable to load data, please try again"); + return; + } + } + + // The schema didn't match — fall through to error/code handling + // below for legacy error events or surface a malformed-payload + // error if no error fields are present. + const parsed = rawParsed; + // error if (parsed.type === "error" || parsed.error || parsed.code) { const errorMsg = @@ -166,6 +236,18 @@ export function useAnalyticsQuery< } return; } + + // The payload matched neither AnalyticsSseMessage nor an error + // event — surface a generic error rather than silently dropping it. + if (!validated.success) { + console.error( + "[useAnalyticsQuery] Malformed SSE payload", + validated.error.flatten(), + ); + setLoading(false); + setError("Unable to load data, please try again"); + return; + } } catch (error) { console.warn("[useAnalyticsQuery] Malformed message received", error); } diff --git a/packages/appkit-ui/src/react/hooks/use-chart-data.ts b/packages/appkit-ui/src/react/hooks/use-chart-data.ts index a90481a2e..ec4b2d4ee 100644 --- a/packages/appkit-ui/src/react/hooks/use-chart-data.ts +++ b/packages/appkit-ui/src/react/hooks/use-chart-data.ts @@ -17,8 +17,8 @@ export interface UseChartDataOptions { parameters?: Record; /** * Data format preference - * - "json": Force JSON format - * - "arrow": Force Arrow format + * - "json_array": Force JSON format + * - "arrow_stream": Force Arrow format * - "auto": Auto-select based on heuristics * @default "auto" */ @@ -52,8 +52,8 @@ function resolveFormat( parameters?: Record, ): "JSON_ARRAY" | "ARROW_STREAM" { // Explicit format selection - if (format === "json") return "JSON_ARRAY"; - if (format === "arrow") return "ARROW_STREAM"; + if (format === "json_array") return "JSON_ARRAY"; + if (format === "arrow_stream") return "ARROW_STREAM"; // Auto-selection heuristics if (format === "auto") { @@ -97,7 +97,7 @@ function resolveFormat( * // Force Arrow format * const { data } = useChartData({ * queryKey: "big_query", - * format: "arrow" + * format: "arrow_stream" * }); * ``` */ diff --git a/packages/appkit/package.json b/packages/appkit/package.json index 89eeb8b21..04232f88b 100644 --- a/packages/appkit/package.json +++ b/packages/appkit/package.json @@ -1,7 +1,7 @@ { "name": "@databricks/appkit", "type": "module", - "version": "0.27.0", + "version": "0.26.0", "main": "./dist/index.js", "types": "./dist/index.d.ts", "packageManager": "pnpm@10.21.0", @@ -69,6 +69,7 @@ "@opentelemetry/sdk-trace-base": "2.6.0", "@opentelemetry/semantic-conventions": "1.38.0", "@types/semver": "7.7.1", + "apache-arrow": "21.1.0", "dotenv": "16.6.1", "express": "4.22.0", "obug": "2.1.1", @@ -77,8 +78,7 @@ "semver": "7.7.3", "shared": "workspace:*", "vite": "npm:rolldown-vite@7.1.14", - "ws": "8.18.3", - "zod": "4.3.6" + "ws": "8.18.3" }, "devDependencies": { "@types/express": "4.17.25", diff --git a/packages/appkit/src/connectors/sql-warehouse/arrow-schema.ts b/packages/appkit/src/connectors/sql-warehouse/arrow-schema.ts new file mode 100644 index 000000000..17d099e37 --- /dev/null +++ b/packages/appkit/src/connectors/sql-warehouse/arrow-schema.ts @@ -0,0 +1,441 @@ +import { + Binary, + Bool, + type DataType, + DateDay, + Decimal, + DurationMicrosecond, + Field, + Float32, + Float64, + Int8, + Int16, + Int32, + Int64, + IntervalYearMonth, + List, + Map_, + Null, + Schema, + Struct, + Table, + TimestampMicrosecond, + tableToIPC, + Utf8, +} from "apache-arrow"; + +/** + * Parse a Databricks SQL type text (the value returned by the Statement + * Execution API in `ColumnInfo.type_text`) into an Apache Arrow DataType. + * + * Supports: + * - All scalar types (STRING, INT, BIGINT, DECIMAL, TIMESTAMP, etc.) + * - Parameterized scalars: DECIMAL(p,s), VARCHAR(n), CHAR(n) + * - Nested types: ARRAY, MAP, STRUCT + * - INTERVAL year-month and day-time variants + * - Backtick-quoted struct field names with embedded `` `` `` escapes + * + * Unknown or unparseable types fall back to Utf8 — empty-Table consumers + * still see a column with the right name; only the inner type is degraded. + */ +export function parseDatabricksType(typeText: string): DataType { + const parser = new TypeParser(typeText); + const result = parser.parseType(); + parser.expectEnd(); + return result; +} + +/** + * Build an empty Arrow IPC stream (base64-encoded) matching the column schema + * returned by the warehouse. Used so ARROW_STREAM responses with no rows still + * deliver a real Arrow Table to the client, preserving the hook's typed + * contract. + */ +export function buildEmptyArrowIPCBase64( + columns: Array<{ + name?: string; + type_text?: string; + type_name?: string; + }>, +): string { + const fields = columns.map((col, index) => { + const typeText = col.type_text ?? col.type_name ?? "STRING"; + let dataType: DataType; + try { + dataType = parseDatabricksType(typeText); + } catch { + dataType = new Utf8(); + } + const name = col.name && col.name.length > 0 ? col.name : `column_${index}`; + return new Field(name, dataType, true); + }); + const schema = new Schema(fields); + const table = new Table(schema); + const ipc = tableToIPC(table, "stream"); + return Buffer.from(ipc).toString("base64"); +} + +// ============================================================================ +// Recursive-descent parser +// ============================================================================ + +class TypeParser { + private readonly input: string; + private pos = 0; + + constructor(input: string) { + this.input = input; + } + + parseType(): DataType { + this.skipWs(); + + let name: string; + if (this.peek() === "`") { + name = this.consumeBacktickIdent(); + } else { + name = this.consumeIdent(); + } + const upper = name.toUpperCase(); + + this.skipWs(); + + if (upper === "INTERVAL") { + return this.parseInterval(); + } + + if (this.peek() === "(") { + this.consume("("); + const args = this.parseNumberArgs(); + this.consume(")"); + this.skipWs(); + return this.makeParameterized(upper, args); + } + + if (this.peek() === "<") { + this.consume("<"); + const result = this.makeGeneric(upper); + this.skipWs(); + this.consume(">"); + return result; + } + + return this.makeScalar(upper); + } + + expectEnd(): void { + this.skipWs(); + if (this.pos < this.input.length) { + throw new Error( + `Unexpected trailing input at position ${this.pos}: "${this.input.slice(this.pos)}"`, + ); + } + } + + // ─── Type constructors ─────────────────────────────────── + + private makeScalar(upper: string): DataType { + switch (upper) { + case "STRING": + case "VARIANT": + return new Utf8(); + case "VARCHAR": + case "CHAR": + return new Utf8(); + case "BINARY": + case "GEOGRAPHY": + case "GEOMETRY": + return new Binary(); + case "BOOLEAN": + case "BOOL": + return new Bool(); + case "TINYINT": + case "BYTE": + return new Int8(); + case "SMALLINT": + case "SHORT": + return new Int16(); + case "INT": + case "INTEGER": + return new Int32(); + case "BIGINT": + case "LONG": + return new Int64(); + case "FLOAT": + case "REAL": + return new Float32(); + case "DOUBLE": + return new Float64(); + case "DECIMAL": + case "NUMERIC": + case "DEC": + return new Decimal(0, 10, 128); + case "DATE": + return new DateDay(); + case "TIMESTAMP": + case "TIMESTAMP_LTZ": + return new TimestampMicrosecond("UTC"); + case "TIMESTAMP_NTZ": + return new TimestampMicrosecond(); + case "VOID": + case "NULL": + return new Null(); + default: + return new Utf8(); + } + } + + private makeParameterized(upper: string, args: number[]): DataType { + switch (upper) { + case "DECIMAL": + case "NUMERIC": + case "DEC": { + const precision = args[0] ?? 10; + const scale = args[1] ?? 0; + // Arrow JS Decimal constructor signature is (scale, precision, bitWidth). + return new Decimal(scale, precision, 128); + } + case "VARCHAR": + case "CHAR": + return new Utf8(); + default: + return new Utf8(); + } + } + + private makeGeneric(upper: string): DataType { + switch (upper) { + case "ARRAY": { + const inner = this.parseType(); + return new List(new Field("item", inner, true)); + } + case "MAP": { + const keyType = this.parseType(); + this.skipWs(); + this.consume(","); + this.skipWs(); + const valueType = this.parseType(); + const entriesStruct = new Struct([ + new Field("key", keyType, false), + new Field("value", valueType, true), + ]); + return new Map_(new Field("entries", entriesStruct, false), false); + } + case "STRUCT": + return this.parseStructFields(); + default: + // Unknown generic — skip to matching '>' and fall back. + this.skipBalancedAngles(); + return new Utf8(); + } + } + + private parseStructFields(): DataType { + const fields: Field[] = []; + while (true) { + this.skipWs(); + if (this.peek() === ">") break; + + let name: string; + if (this.peek() === "`") { + name = this.consumeBacktickIdent(); + } else { + name = this.consumeIdent(); + } + + this.skipWs(); + this.consume(":"); + this.skipWs(); + + const type = this.parseType(); + + // Optional `NOT NULL` and `COMMENT '...'`. Both are accepted by + // Databricks DDL and may appear in `type_text`. + this.skipWs(); + while (this.peekKeyword("NOT")) { + this.consumeIdent(); + this.skipWs(); + if (this.peekKeyword("NULL")) { + this.consumeIdent(); + } + this.skipWs(); + } + if (this.peekKeyword("COMMENT")) { + this.consumeIdent(); + this.skipWs(); + this.consumeStringLiteral(); + this.skipWs(); + } + + fields.push(new Field(name, type, true)); + + this.skipWs(); + if (this.peek() === ",") { + this.consume(","); + } else { + break; + } + } + return new Struct(fields); + } + + private parseInterval(): DataType { + // Grammar: INTERVAL [TO ] + // YEAR / MONTH variants -> IntervalYearMonth + // DAY / HOUR / MINUTE / SECOND variants -> Duration(microsecond) + const seen: string[] = []; + while (this.pos < this.input.length) { + this.skipWs(); + const c = this.peek(); + if (c === "" || c === "," || c === ">" || c === ")") break; + const word = this.consumeIdent().toUpperCase(); + seen.push(word); + } + const isYearMonth = seen.some((w) => w === "YEAR" || w === "MONTH"); + return isYearMonth ? new IntervalYearMonth() : new DurationMicrosecond(); + } + + private parseNumberArgs(): number[] { + const args: number[] = []; + while (true) { + this.skipWs(); + if (this.peek() === ")") break; + args.push(this.consumeNumber()); + this.skipWs(); + if (this.peek() === ",") { + this.consume(","); + } else { + break; + } + } + return args; + } + + // ─── Token utilities ───────────────────────────────────── + + private peek(): string { + return this.input[this.pos] ?? ""; + } + + private peekKeyword(word: string): boolean { + const slice = this.input.slice(this.pos, this.pos + word.length); + if (slice.toUpperCase() !== word.toUpperCase()) return false; + // Must be followed by a non-identifier character (boundary check). + const next = this.input[this.pos + word.length] ?? ""; + return !/[A-Za-z0-9_]/.test(next); + } + + private consume(expected: string): void { + if (this.peek() !== expected) { + throw new Error( + `Expected "${expected}" at position ${this.pos}, got "${this.peek()}" in "${this.input}"`, + ); + } + this.pos++; + } + + private skipWs(): void { + while ( + this.pos < this.input.length && + /\s/.test(this.input[this.pos] ?? "") + ) { + this.pos++; + } + } + + private consumeIdent(): string { + const start = this.pos; + while ( + this.pos < this.input.length && + /[A-Za-z0-9_]/.test(this.input[this.pos] ?? "") + ) { + this.pos++; + } + if (this.pos === start) { + throw new Error( + `Expected identifier at position ${this.pos}, got "${this.peek()}" in "${this.input}"`, + ); + } + return this.input.slice(start, this.pos); + } + + private consumeBacktickIdent(): string { + this.consume("`"); + let value = ""; + while (this.pos < this.input.length) { + if (this.input[this.pos] === "`") { + if (this.input[this.pos + 1] === "`") { + value += "`"; + this.pos += 2; + continue; + } + break; + } + value += this.input[this.pos]; + this.pos++; + } + this.consume("`"); + return value; + } + + private consumeNumber(): number { + const start = this.pos; + while ( + this.pos < this.input.length && + /[0-9]/.test(this.input[this.pos] ?? "") + ) { + this.pos++; + } + if (this.pos === start) { + throw new Error( + `Expected number at position ${this.pos}, got "${this.peek()}" in "${this.input}"`, + ); + } + return Number.parseInt(this.input.slice(start, this.pos), 10); + } + + private consumeStringLiteral(): string { + const quote = this.peek(); + if (quote !== "'" && quote !== '"') { + throw new Error( + `Expected string literal at position ${this.pos}, got "${quote}" in "${this.input}"`, + ); + } + this.pos++; + let value = ""; + while (this.pos < this.input.length) { + const c = this.input[this.pos]; + if (c === "\\") { + // Escape sequence: keep the next char verbatim. + const next = this.input[this.pos + 1]; + if (next !== undefined) { + value += next; + this.pos += 2; + continue; + } + this.pos++; + continue; + } + if (c === quote) { + this.pos++; + return value; + } + value += c; + this.pos++; + } + throw new Error(`Unterminated string literal in "${this.input}"`); + } + + private skipBalancedAngles(): void { + let depth = 1; + while (this.pos < this.input.length && depth > 0) { + const c = this.peek(); + if (c === "<") depth++; + else if (c === ">") { + depth--; + if (depth === 0) return; + } + this.pos++; + } + } +} diff --git a/packages/appkit/src/connectors/sql-warehouse/client.ts b/packages/appkit/src/connectors/sql-warehouse/client.ts index d0a1c1816..4ae439416 100644 --- a/packages/appkit/src/connectors/sql-warehouse/client.ts +++ b/packages/appkit/src/connectors/sql-warehouse/client.ts @@ -21,10 +21,24 @@ import { SpanStatusCode, TelemetryManager, } from "../../telemetry"; +import { buildEmptyArrowIPCBase64 } from "./arrow-schema"; import { executeStatementDefaults } from "./defaults"; const logger = createLogger("connectors:sql-warehouse"); +/** + * Maximum size for inline Arrow IPC attachments (8 MiB decoded). + * Aligned with `streamDefaults.maxEventSize` so anything that would exceed + * the SSE event cap fails here with a clear error rather than a confusing + * "Buffer size exceeded" downstream. Larger results should use + * `disposition: "EXTERNAL_LINKS"`, which the analytics fallback handles. + * + * RAISE TO 25 MiB (Databricks API hard cap on INLINE) if PR #320 (stash + + * serve via /arrow-result) lands — that proposal moves bulk bytes off SSE + * onto HTTP, so the SSE event-size constraint no longer applies here. + */ +const MAX_INLINE_ATTACHMENT_BYTES = 8 * 1024 * 1024; + interface SQLWarehouseConfig { timeout?: number; telemetry?: TelemetryOptions; @@ -196,7 +210,10 @@ export class SQLWarehouseConnector { result = this._transformDataArray(response); break; case "FAILED": - throw ExecutionError.statementFailed(status.error?.message); + throw ExecutionError.statementFailed( + status.error?.message, + status.error?.error_code, + ); case "CANCELED": throw ExecutionError.canceled(); case "CLOSED": @@ -236,18 +253,22 @@ export class SQLWarehouseConnector { code: SpanStatusCode.ERROR, message: error instanceof Error ? error.message : String(error), }); - - logger.error( - "Statement execution failed: %s", - error instanceof Error ? error.message : String(error), - ); } if (error instanceof AppKitError) { throw error; } + // Preserve the SDK's structured ApiError.errorCode (e.g. + // "INVALID_PARAMETER_VALUE", "BAD_REQUEST") through the wrap so + // callers can branch on a stable identifier rather than + // substring-matching the message. + const sdkErrorCode = + error && typeof error === "object" && "errorCode" in error + ? (error as { errorCode?: unknown }).errorCode + : undefined; throw ExecutionError.statementFailed( error instanceof Error ? error.message : String(error), + typeof sdkErrorCode === "string" ? sdkErrorCode : undefined, ); } finally { // remove abort handler @@ -360,7 +381,10 @@ export class SQLWarehouseConnector { span.setStatus({ code: SpanStatusCode.OK }); return this._transformDataArray(response); case "FAILED": - throw ExecutionError.statementFailed(status.error?.message); + throw ExecutionError.statementFailed( + status.error?.message, + status.error?.error_code, + ); case "CANCELED": throw ExecutionError.canceled(); case "CLOSED": @@ -382,12 +406,16 @@ export class SQLWarehouseConnector { message: error instanceof Error ? error.message : String(error), }); - // error logging is handled by executeStatement's catch block (gated on isAborted) if (error instanceof AppKitError) { throw error; } + const sdkErrorCode = + error && typeof error === "object" && "errorCode" in error + ? (error as { errorCode?: unknown }).errorCode + : undefined; throw ExecutionError.statementFailed( error instanceof Error ? error.message : String(error), + typeof sdkErrorCode === "string" ? sdkErrorCode : undefined, ); } finally { span.end(); @@ -399,7 +427,40 @@ export class SQLWarehouseConnector { private _transformDataArray(response: sql.StatementResponse) { if (response.manifest?.format === "ARROW_STREAM") { - return this.updateWithArrowStatus(response); + const result = response.result as + | (sql.ResultData & { attachment?: string }) + | undefined; + + // Inline Arrow: pass the base64 IPC attachment through unmodified so + // the analytics route can stream it to the client, where the existing + // ArrowClient infrastructure decodes it into a Table. Validate size + // here to fail fast on runaway payloads. + if (result?.attachment) { + return this._validateArrowAttachment(response, result.attachment); + } + + // External links: data fetched separately via statement_id. + if (result?.external_links) { + return this.updateWithArrowStatus(response); + } + + // Empty result with a known schema: synthesize a zero-row Arrow IPC + // attachment so the client always receives an Arrow Table for + // ARROW_STREAM, regardless of whether the warehouse returned data. + if (!result?.data_array && response.manifest?.schema?.columns) { + const synthesized = buildEmptyArrowIPCBase64( + response.manifest.schema.columns, + ); + return { + ...response, + result: { ...(result ?? {}), attachment: synthesized }, + }; + } + + // Inline data_array under ARROW_STREAM (rare): fall through to the + // row transform below. The hook will receive `type: "result"` rows; + // callers asking for ARROW_STREAM should not hit this path with + // current Databricks warehouses. } if (!response.result?.data_array || !response.manifest?.schema?.columns) { @@ -445,6 +506,41 @@ export class SQLWarehouseConnector { }; } + /** + * Validate (but do not decode) a base64 Arrow IPC attachment. + * Some serverless warehouses return inline results as Arrow IPC in + * `result.attachment`. We pass the base64 string through to the client, + * which decodes it into an Arrow Table via the existing ArrowClient + * infrastructure. This keeps the wire contract for ARROW_STREAM + * consistent (client always receives an Arrow Table) and avoids + * decode/re-encode work on the server. + */ + private _validateArrowAttachment( + response: sql.StatementResponse, + attachment: string, + ) { + // Cap the size to protect against unbounded inline payloads from + // misbehaving warehouses. 64 MiB is well above the typical inline limit + // (~25 MiB hard cap on the API) but bounds memory if a server returns + // a runaway response. + // + // Strip whitespace (rare but legal in base64) and account for trailing + // `=` padding so the byte count is exact rather than an upper bound. + const stripped = attachment.replace(/\s+/g, ""); + const padding = stripped.endsWith("==") + ? 2 + : stripped.endsWith("=") + ? 1 + : 0; + const decodedSize = Math.floor((stripped.length * 3) / 4) - padding; + if (decodedSize > MAX_INLINE_ATTACHMENT_BYTES) { + throw ExecutionError.statementFailed( + `Inline Arrow attachment exceeds maximum size (${decodedSize} > ${MAX_INLINE_ATTACHMENT_BYTES} bytes)`, + ); + } + return response; + } + private updateWithArrowStatus(response: sql.StatementResponse): { result: { statement_id: string; status: sql.StatementStatus }; } { diff --git a/packages/appkit/src/connectors/sql-warehouse/tests/arrow-schema.test.ts b/packages/appkit/src/connectors/sql-warehouse/tests/arrow-schema.test.ts new file mode 100644 index 000000000..e30b7315a --- /dev/null +++ b/packages/appkit/src/connectors/sql-warehouse/tests/arrow-schema.test.ts @@ -0,0 +1,514 @@ +import { + Binary, + Bool, + type DataType, + DateDay, + Decimal, + DurationMicrosecond, + Float32, + Float64, + Int8, + Int16, + Int32, + Int64, + IntervalYearMonth, + List, + Map_, + Null, + Struct, + TimestampMicrosecond, + Type, + tableFromIPC, + Utf8, +} from "apache-arrow"; +import { describe, expect, test } from "vitest"; +import { buildEmptyArrowIPCBase64, parseDatabricksType } from "../arrow-schema"; + +// ============================================================================ +// Helpers +// ============================================================================ + +/** Walk the type tree and produce a stable string representation for assertions. */ +function typeSummary(t: DataType): string { + if (t instanceof Decimal) return `Decimal(${t.precision},${t.scale})`; + if (t instanceof TimestampMicrosecond) { + const tz = (t as TimestampMicrosecond & { timezone?: string }).timezone; + return tz ? `Timestamp[us,${tz}]` : "Timestamp[us]"; + } + if (t instanceof List) { + const inner = (t.children?.[0]?.type as DataType | undefined) ?? new Utf8(); + return `List<${typeSummary(inner)}>`; + } + if (t instanceof Struct) { + const inner = (t.children ?? []) + .map((f) => `${f.name}:${typeSummary(f.type as DataType)}`) + .join(","); + return `Struct<${inner}>`; + } + if (t instanceof Map_) { + const entries = + (t.children?.[0]?.type as Struct | undefined)?.children ?? []; + const k = entries[0]?.type as DataType | undefined; + const v = entries[1]?.type as DataType | undefined; + return `Map<${typeSummary(k ?? new Utf8())},${typeSummary(v ?? new Utf8())}>`; + } + // Fall back to typeId for primitives. + return Type[t.typeId] ?? t.constructor.name; +} + +// ============================================================================ +// Scalar types +// ============================================================================ + +describe("parseDatabricksType — scalars", () => { + test.each([ + ["STRING", Utf8], + ["VARIANT", Utf8], + ["BINARY", Binary], + ["GEOGRAPHY", Binary], + ["GEOMETRY", Binary], + ["BOOLEAN", Bool], + ["BOOL", Bool], + ["TINYINT", Int8], + ["BYTE", Int8], + ["SMALLINT", Int16], + ["SHORT", Int16], + ["INT", Int32], + ["INTEGER", Int32], + ["BIGINT", Int64], + ["LONG", Int64], + ["FLOAT", Float32], + ["REAL", Float32], + ["DOUBLE", Float64], + ["DATE", DateDay], + ["VOID", Null], + ["NULL", Null], + ] as const)("%s parses to expected type", (input, ctor) => { + const t = parseDatabricksType(input); + expect(t).toBeInstanceOf(ctor); + }); + + test("case-insensitive — lowercase is accepted", () => { + expect(parseDatabricksType("string")).toBeInstanceOf(Utf8); + expect(parseDatabricksType("bigint")).toBeInstanceOf(Int64); + }); + + test("TIMESTAMP defaults to UTC tz", () => { + const t = parseDatabricksType("TIMESTAMP") as TimestampMicrosecond; + expect(t).toBeInstanceOf(TimestampMicrosecond); + expect(t.timezone).toBe("UTC"); + }); + + test("TIMESTAMP_LTZ behaves like TIMESTAMP", () => { + const t = parseDatabricksType("TIMESTAMP_LTZ") as TimestampMicrosecond; + expect(t.timezone).toBe("UTC"); + }); + + test("TIMESTAMP_NTZ has no timezone", () => { + const t = parseDatabricksType("TIMESTAMP_NTZ") as TimestampMicrosecond; + expect(t).toBeInstanceOf(TimestampMicrosecond); + expect(t.timezone == null || t.timezone === "").toBe(true); + }); + + test("Unknown scalar falls back to Utf8 (degraded but doesn't throw)", () => { + expect(parseDatabricksType("SOMETHING_NEW")).toBeInstanceOf(Utf8); + }); +}); + +// ============================================================================ +// Parameterized scalars +// ============================================================================ + +describe("parseDatabricksType — parameterized scalars", () => { + test("VARCHAR(255) → Utf8 (Arrow doesn't track string length)", () => { + expect(parseDatabricksType("VARCHAR(255)")).toBeInstanceOf(Utf8); + }); + + test("CHAR(10) → Utf8", () => { + expect(parseDatabricksType("CHAR(10)")).toBeInstanceOf(Utf8); + }); + + test("DECIMAL(10,2) → Decimal(precision=10, scale=2)", () => { + const t = parseDatabricksType("DECIMAL(10,2)") as Decimal; + expect(t).toBeInstanceOf(Decimal); + expect(t.precision).toBe(10); + expect(t.scale).toBe(2); + }); + + test("DECIMAL(38,0) — max precision, no scale", () => { + const t = parseDatabricksType("DECIMAL(38,0)") as Decimal; + expect(t.precision).toBe(38); + expect(t.scale).toBe(0); + }); + + test("NUMERIC(p,s) is an alias for DECIMAL(p,s)", () => { + const t = parseDatabricksType("NUMERIC(15,4)") as Decimal; + expect(t).toBeInstanceOf(Decimal); + expect(t.precision).toBe(15); + expect(t.scale).toBe(4); + }); + + test("DEC(p,s) is an alias for DECIMAL(p,s)", () => { + const t = parseDatabricksType("DEC(7,3)") as Decimal; + expect(t.precision).toBe(7); + expect(t.scale).toBe(3); + }); + + test("DECIMAL with whitespace inside parens", () => { + const t = parseDatabricksType("DECIMAL( 10 , 2 )") as Decimal; + expect(t.precision).toBe(10); + expect(t.scale).toBe(2); + }); + + test("DECIMAL with single arg (precision only) defaults scale=0", () => { + const t = parseDatabricksType("DECIMAL(20)") as Decimal; + expect(t.precision).toBe(20); + expect(t.scale).toBe(0); + }); + + test("Bare DECIMAL falls back to default precision/scale", () => { + const t = parseDatabricksType("DECIMAL") as Decimal; + expect(t).toBeInstanceOf(Decimal); + expect(typeof t.precision).toBe("number"); + expect(typeof t.scale).toBe("number"); + }); +}); + +// ============================================================================ +// INTERVAL types +// ============================================================================ + +describe("parseDatabricksType — INTERVAL", () => { + test("INTERVAL YEAR → IntervalYearMonth", () => { + expect(parseDatabricksType("INTERVAL YEAR")).toBeInstanceOf( + IntervalYearMonth, + ); + }); + + test("INTERVAL MONTH → IntervalYearMonth", () => { + expect(parseDatabricksType("INTERVAL MONTH")).toBeInstanceOf( + IntervalYearMonth, + ); + }); + + test("INTERVAL YEAR TO MONTH → IntervalYearMonth", () => { + expect(parseDatabricksType("INTERVAL YEAR TO MONTH")).toBeInstanceOf( + IntervalYearMonth, + ); + }); + + test("INTERVAL DAY → DurationMicrosecond", () => { + expect(parseDatabricksType("INTERVAL DAY")).toBeInstanceOf( + DurationMicrosecond, + ); + }); + + test("INTERVAL DAY TO SECOND → DurationMicrosecond", () => { + expect(parseDatabricksType("INTERVAL DAY TO SECOND")).toBeInstanceOf( + DurationMicrosecond, + ); + }); + + test("INTERVAL HOUR TO MINUTE → DurationMicrosecond", () => { + expect(parseDatabricksType("INTERVAL HOUR TO MINUTE")).toBeInstanceOf( + DurationMicrosecond, + ); + }); +}); + +// ============================================================================ +// ARRAY +// ============================================================================ + +describe("parseDatabricksType — ARRAY", () => { + test("ARRAY → List", () => { + const t = parseDatabricksType("ARRAY") as List; + expect(t).toBeInstanceOf(List); + expect(t.children?.[0]?.type).toBeInstanceOf(Utf8); + }); + + test("ARRAY → List", () => { + const t = parseDatabricksType("ARRAY") as List; + expect(t.children?.[0]?.type).toBeInstanceOf(Int32); + }); + + test("ARRAY preserves precision/scale", () => { + const t = parseDatabricksType("ARRAY") as List; + const inner = t.children?.[0]?.type as Decimal; + expect(inner).toBeInstanceOf(Decimal); + expect(inner.precision).toBe(10); + expect(inner.scale).toBe(2); + }); + + test("ARRAY> — nested twice", () => { + const t = parseDatabricksType("ARRAY>") as List; + const inner1 = t.children?.[0]?.type as List; + expect(inner1).toBeInstanceOf(List); + expect(inner1.children?.[0]?.type).toBeInstanceOf(Int32); + }); + + test("ARRAY>> — three levels deep", () => { + expect( + typeSummary(parseDatabricksType("ARRAY>>")), + ).toBe("List>>"); + }); + + test("ARRAY with whitespace", () => { + const t = parseDatabricksType("ARRAY < STRING >") as List; + expect(t.children?.[0]?.type).toBeInstanceOf(Utf8); + }); +}); + +// ============================================================================ +// MAP +// ============================================================================ + +describe("parseDatabricksType — MAP", () => { + test("MAP", () => { + expect(typeSummary(parseDatabricksType("MAP"))).toBe( + "Map", + ); + }); + + test("MAP — with whitespace", () => { + expect(typeSummary(parseDatabricksType("MAP"))).toBe( + "Map", + ); + }); + + test("MAP> — value is nested", () => { + expect(typeSummary(parseDatabricksType("MAP>"))).toBe( + "Map>", + ); + }); + + test("MAP> — fully nested", () => { + expect( + typeSummary(parseDatabricksType("MAP>")), + ).toBe("Map>"); + }); +}); + +// ============================================================================ +// STRUCT +// ============================================================================ + +describe("parseDatabricksType — STRUCT", () => { + test("STRUCT", () => { + const t = parseDatabricksType("STRUCT") as Struct; + expect(t).toBeInstanceOf(Struct); + expect(t.children?.length).toBe(2); + expect(t.children?.[0]?.name).toBe("a"); + expect(t.children?.[0]?.type).toBeInstanceOf(Int32); + expect(t.children?.[1]?.name).toBe("b"); + expect(t.children?.[1]?.type).toBeInstanceOf(Utf8); + }); + + test("STRUCT with whitespace and many fields", () => { + const t = parseDatabricksType( + "STRUCT", + ) as Struct; + expect(t.children?.map((f) => f.name)).toEqual(["id", "name", "ts"]); + expect(t.children?.[0]?.type).toBeInstanceOf(Int64); + expect(t.children?.[2]?.type).toBeInstanceOf(TimestampMicrosecond); + }); + + test("STRUCT with COMMENT on a field", () => { + const t = parseDatabricksType( + "STRUCT", + ) as Struct; + expect(t.children?.length).toBe(2); + expect(t.children?.[0]?.name).toBe("id"); + expect(t.children?.[0]?.type).toBeInstanceOf(Int32); + expect(t.children?.[1]?.name).toBe("name"); + }); + + test("STRUCT with COMMENT containing escaped quote", () => { + const t = parseDatabricksType( + "STRUCT", + ) as Struct; + expect(t.children?.length).toBe(2); + expect(t.children?.[0]?.name).toBe("id"); + }); + + test("STRUCT with NOT NULL annotation on a field", () => { + const t = parseDatabricksType( + "STRUCT", + ) as Struct; + expect(t.children?.length).toBe(2); + expect(t.children?.[0]?.name).toBe("id"); + }); + + test("STRUCT with backticked field name", () => { + const t = parseDatabricksType( + "STRUCT<`weird name`:INT, normal:STRING>", + ) as Struct; + expect(t.children?.[0]?.name).toBe("weird name"); + expect(t.children?.[0]?.type).toBeInstanceOf(Int32); + }); + + test("STRUCT with backticked field name containing escaped backtick", () => { + const t = parseDatabricksType( + "STRUCT<`with``tick`:INT, other:STRING>", + ) as Struct; + expect(t.children?.[0]?.name).toBe("with`tick"); + }); + + test("STRUCT with nested STRUCT", () => { + const t = parseDatabricksType( + "STRUCT, name:STRING>", + ) as Struct; + expect(t.children?.length).toBe(2); + const nested = t.children?.[0]?.type as Struct; + expect(nested).toBeInstanceOf(Struct); + expect(nested.children?.[0]?.name).toBe("inner"); + expect(nested.children?.[0]?.type).toBeInstanceOf(Int32); + }); + + test("Empty STRUCT<>", () => { + const t = parseDatabricksType("STRUCT<>") as Struct; + expect(t).toBeInstanceOf(Struct); + expect(t.children?.length).toBe(0); + }); +}); + +// ============================================================================ +// Deep nesting / mixed types +// ============================================================================ + +describe("parseDatabricksType — deeply nested", () => { + test("MAP>>", () => { + expect( + typeSummary( + parseDatabricksType( + "MAP>>", + ), + ), + ).toBe("Map>>"); + }); + + test("ARRAY>>> — 4 levels mixed", () => { + expect( + typeSummary( + parseDatabricksType( + "ARRAY>>>", + ), + ), + ).toBe("List>>>"); + }); +}); + +// ============================================================================ +// Error / robustness behavior +// ============================================================================ + +describe("parseDatabricksType — error / robustness", () => { + test("trailing garbage throws", () => { + expect(() => parseDatabricksType("INT junk")).toThrow(); + }); + + test("unmatched < throws", () => { + expect(() => parseDatabricksType("ARRAY { + expect(() => parseDatabricksType("DECIMAL(10,2")).toThrow(); + }); + + test("empty string throws", () => { + expect(() => parseDatabricksType("")).toThrow(); + }); +}); + +// ============================================================================ +// buildEmptyArrowIPCBase64 — round-trip +// ============================================================================ + +describe("buildEmptyArrowIPCBase64", () => { + test("produces a decodable empty Arrow Table with the right schema", () => { + const columns = [ + { name: "user_id", type_text: "BIGINT" }, + { name: "name", type_text: "STRING" }, + { name: "created_at", type_text: "TIMESTAMP" }, + { name: "balance", type_text: "DECIMAL(10,2)" }, + { name: "active", type_text: "BOOLEAN" }, + ]; + const b64 = buildEmptyArrowIPCBase64(columns); + const buf = Buffer.from(b64, "base64"); + const table = tableFromIPC(buf); + expect(table.numRows).toBe(0); + expect(table.numCols).toBe(5); + expect(table.schema.fields.map((f) => f.name)).toEqual([ + "user_id", + "name", + "created_at", + "balance", + "active", + ]); + expect( + (table.schema.fields[0]?.type as { bitWidth?: number }).bitWidth, + ).toBe(64); + expect(table.schema.fields[1]?.type).toBeInstanceOf(Utf8); + // After IPC round-trip Arrow JS resolves Timestamp* subclasses to a + // generic Timestamp with `unit` and `timezone`; assert structurally. + expect(table.schema.fields[2]?.type.typeId).toBe(Type.Timestamp); + expect((table.schema.fields[2]?.type as { unit?: number }).unit).toBe(2); // TimeUnit.MICROSECOND + const decimal = table.schema.fields[3]?.type as Decimal; + expect(decimal).toBeInstanceOf(Decimal); + expect(decimal.precision).toBe(10); + expect(decimal.scale).toBe(2); + expect(table.schema.fields[4]?.type).toBeInstanceOf(Bool); + }); + + test("round-trips nested types end-to-end", () => { + const columns = [ + { name: "tags", type_text: "ARRAY" }, + { name: "meta", type_text: "STRUCT" }, + { name: "counts", type_text: "MAP" }, + ]; + const buf = Buffer.from(buildEmptyArrowIPCBase64(columns), "base64"); + const table = tableFromIPC(buf); + expect(table.numRows).toBe(0); + expect(table.numCols).toBe(3); + expect(table.schema.fields[0]?.type).toBeInstanceOf(List); + expect(table.schema.fields[1]?.type).toBeInstanceOf(Struct); + expect(table.schema.fields[2]?.type).toBeInstanceOf(Map_); + }); + + test("falls back from type_text to type_name when type_text missing", () => { + const columns = [{ name: "id", type_name: "BIGINT" }]; + const buf = Buffer.from(buildEmptyArrowIPCBase64(columns), "base64"); + const table = tableFromIPC(buf); + expect( + (table.schema.fields[0]?.type as { bitWidth?: number }).bitWidth, + ).toBe(64); + }); + + test("unknown type degrades to Utf8 without throwing", () => { + const columns = [ + { name: "id", type_text: "BIGINT" }, + { name: "weird", type_text: "FUTURE_TYPE_NOT_YET_SUPPORTED" }, + ]; + const buf = Buffer.from(buildEmptyArrowIPCBase64(columns), "base64"); + const table = tableFromIPC(buf); + expect( + (table.schema.fields[0]?.type as { bitWidth?: number }).bitWidth, + ).toBe(64); + expect(table.schema.fields[1]?.type).toBeInstanceOf(Utf8); + }); + + test("missing column name gets a synthesized placeholder", () => { + const columns = [{ type_text: "STRING" }, { name: "", type_text: "INT" }]; + const buf = Buffer.from(buildEmptyArrowIPCBase64(columns), "base64"); + const table = tableFromIPC(buf); + expect(table.schema.fields[0]?.name).toBe("column_0"); + expect(table.schema.fields[1]?.name).toBe("column_1"); + }); + + test("empty schema produces a valid 0-column 0-row Table", () => { + const buf = Buffer.from(buildEmptyArrowIPCBase64([]), "base64"); + const table = tableFromIPC(buf); + expect(table.numRows).toBe(0); + expect(table.numCols).toBe(0); + }); +}); diff --git a/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts b/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts new file mode 100644 index 000000000..c7f73c988 --- /dev/null +++ b/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts @@ -0,0 +1,382 @@ +import type { sql } from "@databricks/sdk-experimental"; +import { tableFromIPC } from "apache-arrow"; +import { describe, expect, test, vi } from "vitest"; + +vi.mock("../../../telemetry", () => { + const mockMeter = { + createCounter: () => ({ add: vi.fn() }), + createHistogram: () => ({ record: vi.fn() }), + }; + return { + TelemetryManager: { + getProvider: () => ({ + startActiveSpan: vi.fn(), + getMeter: () => mockMeter, + }), + }, + SpanKind: { CLIENT: 1 }, + SpanStatusCode: { ERROR: 2 }, + }; +}); +vi.mock("../../../logging/logger", () => ({ + createLogger: () => ({ + info: vi.fn(), + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + event: () => null, + }), +})); +vi.mock("../../../stream/arrow-stream-processor", () => ({ + ArrowStreamProcessor: vi.fn(), +})); + +import { SQLWarehouseConnector } from "../client"; + +function createConnector() { + return new SQLWarehouseConnector({ timeout: 30000 }); +} + +// Real base64 Arrow IPC from a serverless warehouse returning +// `SELECT 1 AS test_col, 2 AS test_col2` with INLINE + ARROW_STREAM. +// Contains schema (two INT columns) + one record batch with values [1, 2]. +const REAL_ARROW_ATTACHMENT = + "/////7gAAAAQAAAAAAAKAAwACgAJAAQACgAAABAAAAAAAQQACAAIAAAABAAIAAAABAAAAAIAAABMAAAABAAAAMz///8QAAAAGAAAAAAAAQIUAAAAvP///yAAAAAAAAABAAAAAAkAAAB0ZXN0X2NvbDIAAAAQABQAEAAOAA8ABAAAAAgAEAAAABgAAAAgAAAAAAABAhwAAAAIAAwABAALAAgAAAAgAAAAAAAAAQAAAAAIAAAAdGVzdF9jb2wAAAAA/////7gAAAAQAAAADAAaABgAFwAEAAgADAAAACAAAAAAAQAAAAAAAAAAAAAAAAADBAAKABgADAAIAAQACgAAADwAAAAQAAAAAQAAAAAAAAAAAAAAAgAAAAEAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAEAAAAAAAAAQAAAAAAAAAAEAAAAAAAAAIAAAAAAAAAAAQAAAAAAAADAAAAAAAAAAAQAAAAAAAAA/wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAP////8AAAAA"; + +describe("SQLWarehouseConnector._transformDataArray", () => { + describe("classic warehouse (JSON_ARRAY + INLINE)", () => { + test("transforms data_array rows into named objects", () => { + const connector = createConnector(); + // Real response shape from classic warehouse: INLINE + JSON_ARRAY + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "JSON_ARRAY", + schema: { + column_count: 2, + columns: [ + { + name: "test_col", + type_text: "INT", + type_name: "INT", + position: 0, + }, + { + name: "test_col2", + type_text: "INT", + type_name: "INT", + position: 1, + }, + ], + }, + total_row_count: 1, + truncated: false, + }, + result: { + data_array: [["1", "2"]], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.data).toEqual([{ test_col: "1", test_col2: "2" }]); + expect(result.result.data_array).toBeUndefined(); + }); + + test("parses JSON strings in STRING columns", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "JSON_ARRAY", + schema: { + columns: [ + { name: "id", type_name: "INT" }, + { name: "metadata", type_name: "STRING" }, + ], + }, + }, + result: { + data_array: [["1", '{"key":"value"}']], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.data[0].metadata).toEqual({ key: "value" }); + }); + }); + + describe("classic warehouse (EXTERNAL_LINKS + ARROW_STREAM)", () => { + test("returns statement_id for external links fetch", () => { + const connector = createConnector(); + // Real response shape from classic warehouse: EXTERNAL_LINKS + ARROW_STREAM + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "test_col", type_name: "INT" }, + { name: "test_col2", type_name: "INT" }, + ], + }, + }, + result: { + external_links: [ + { + external_link: "https://storage.example.com/chunk0", + expiration: "2026-04-15T00:00:00Z", + }, + ], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.statement_id).toBe("stmt-1"); + expect(result.result.data).toBeUndefined(); + }); + }); + + describe("serverless warehouse (INLINE + ARROW_STREAM with attachment)", () => { + test("passes attachment through unchanged for client-side decoding", () => { + const connector = createConnector(); + // Real response shape from serverless warehouse: INLINE + ARROW_STREAM + // Data arrives in result.attachment as base64-encoded Arrow IPC, not data_array. + const response = { + statement_id: "00000001-test-stmt", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + column_count: 2, + columns: [ + { + name: "test_col", + type_text: "INT", + type_name: "INT", + position: 0, + }, + { + name: "test_col2", + type_text: "INT", + type_name: "INT", + position: 1, + }, + ], + total_chunk_count: 1, + chunks: [{ chunk_index: 0, row_offset: 0, row_count: 1 }], + total_row_count: 1, + }, + truncated: false, + }, + result: { + chunk_index: 0, + row_offset: 0, + row_count: 1, + attachment: REAL_ARROW_ATTACHMENT, + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.attachment).toBe(REAL_ARROW_ATTACHMENT); + expect(result.result.data).toBeUndefined(); + // Preserves other result fields + expect(result.result.row_count).toBe(1); + }); + + test("preserves manifest and status alongside attachment", () => { + const connector = createConnector(); + const response = { + statement_id: "00000001-test-stmt", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "test_col", type_name: "INT" }, + { name: "test_col2", type_name: "INT" }, + ], + }, + }, + result: { + chunk_index: 0, + row_count: 1, + attachment: REAL_ARROW_ATTACHMENT, + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + // Manifest, statement_id, and attachment are all preserved + expect(result.manifest.format).toBe("ARROW_STREAM"); + expect(result.statement_id).toBe("00000001-test-stmt"); + expect(result.result.attachment).toBe(REAL_ARROW_ATTACHMENT); + }); + + test("synthesizes an empty Arrow IPC attachment for empty results so the client always gets a Table", () => { + const connector = createConnector(); + // Empty result: no attachment, no data_array, no external_links — but + // the manifest still describes the schema. The connector should fill in + // `attachment` with a zero-row Arrow IPC matching the schema. + const response = { + statement_id: "stmt-empty", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "user_id", type_text: "BIGINT", type_name: "BIGINT" }, + { name: "name", type_text: "STRING", type_name: "STRING" }, + { + name: "balance", + type_text: "DECIMAL(10,2)", + type_name: "DECIMAL", + }, + ], + }, + total_row_count: 0, + }, + result: {}, + } as unknown as sql.StatementResponse; + + const transformed = (connector as any)._transformDataArray(response); + const attachment: string = transformed.result.attachment; + expect(typeof attachment).toBe("string"); + expect(attachment.length).toBeGreaterThan(0); + + // Verify the synthesized attachment decodes into the right empty schema. + const table = tableFromIPC(Buffer.from(attachment, "base64")); + expect(table.numRows).toBe(0); + expect(table.schema.fields.map((f) => f.name)).toEqual([ + "user_id", + "name", + "balance", + ]); + }); + + test("does NOT synthesize an attachment when external_links are present", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-ext", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { columns: [{ name: "x", type_text: "INT" }] }, + }, + result: { + external_links: [ + { external_link: "https://example.com/x", expiration: "9999" }, + ], + }, + } as unknown as sql.StatementResponse; + + const transformed = (connector as any)._transformDataArray(response); + // External-links path returns the statement_id projection — no attachment. + expect(transformed.result.attachment).toBeUndefined(); + expect(transformed.result.statement_id).toBe("stmt-ext"); + }); + + test("does NOT synthesize an attachment when schema is missing", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-no-schema", + status: { state: "SUCCEEDED" }, + manifest: { format: "ARROW_STREAM" }, + result: {}, + } as unknown as sql.StatementResponse; + + const transformed = (connector as any)._transformDataArray(response); + // Without a schema we cannot build a Table — pass through unchanged. + expect(transformed.result?.attachment).toBeUndefined(); + }); + + test("rejects oversized attachments to bound memory", () => { + const connector = createConnector(); + // 8 MiB decoded cap → ~12 MiB of base64 chars decodes to >8 MiB. + const oversized = "A".repeat(12 * 1024 * 1024); + const response = { + statement_id: "stmt-oversized", + status: { state: "SUCCEEDED" }, + manifest: { format: "ARROW_STREAM" }, + result: { attachment: oversized }, + } as unknown as sql.StatementResponse; + + expect(() => (connector as any)._transformDataArray(response)).toThrow( + /exceeds maximum size/, + ); + }); + }); + + describe("ARROW_STREAM with data_array (hypothetical inline variant)", () => { + test("transforms data_array like JSON_ARRAY path", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "id", type_name: "INT" }, + { name: "value", type_name: "STRING" }, + ], + }, + }, + result: { + data_array: [ + ["1", "hello"], + ["2", "world"], + ], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.data).toEqual([ + { id: "1", value: "hello" }, + { id: "2", value: "world" }, + ]); + }); + }); + + describe("edge cases", () => { + test("returns response unchanged when no data_array, attachment, or schema", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { format: "JSON_ARRAY" }, + result: {}, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result).toBe(response); + }); + + test("attachment takes priority over data_array when both present", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "test_col", type_name: "INT" }, + { name: "test_col2", type_name: "INT" }, + ], + }, + }, + result: { + attachment: REAL_ARROW_ATTACHMENT, + data_array: [["999", "999"]], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + // Should pass attachment through (client decodes), not transform data_array + expect(result.result.attachment).toBe(REAL_ARROW_ATTACHMENT); + expect(result.result.data).toBeUndefined(); + }); + }); +}); diff --git a/packages/appkit/src/errors/execution.ts b/packages/appkit/src/errors/execution.ts index 42de77043..1e6d1f5f6 100644 --- a/packages/appkit/src/errors/execution.ts +++ b/packages/appkit/src/errors/execution.ts @@ -16,13 +16,39 @@ export class ExecutionError extends AppKitError { readonly isRetryable = false; /** - * Create an execution error for statement failure + * Structured error code from the upstream source (typically the warehouse's + * `error_code` for statement-level failures, or the SDK's `ApiError.errorCode` + * for HTTP failures). Preserved through wrapping so callers can branch on a + * stable identifier without substring-matching the message. */ - static statementFailed(errorMessage?: string): ExecutionError { + readonly errorCode?: string; + + constructor( + message: string, + options?: { + cause?: Error; + context?: Record; + errorCode?: string; + }, + ) { + super(message, options); + this.errorCode = options?.errorCode; + } + + /** + * Create an execution error for statement failure. + * @param errorMessage Human-readable error from the warehouse / SDK. + * @param errorCode Structured code (e.g. "INVALID_PARAMETER_VALUE") to + * preserve through wrapping. Optional. + */ + static statementFailed( + errorMessage?: string, + errorCode?: string, + ): ExecutionError { const message = errorMessage ? `Statement failed: ${errorMessage}` : "Statement failed: Unknown error"; - return new ExecutionError(message); + return new ExecutionError(message, { errorCode }); } /** diff --git a/packages/appkit/src/plugins/analytics/analytics.ts b/packages/appkit/src/plugins/analytics/analytics.ts index 7bc75da60..07d08cf02 100644 --- a/packages/appkit/src/plugins/analytics/analytics.ts +++ b/packages/appkit/src/plugins/analytics/analytics.ts @@ -1,13 +1,18 @@ import type { WorkspaceClient } from "@databricks/sdk-experimental"; import type express from "express"; -import type { - IAppRouter, - PluginExecuteConfig, - SQLTypeMarker, - StreamExecutionSettings, +import { + type AnalyticsSseMessage, + type IAppRouter, + makeArrowInlineMessage, + makeArrowMessage, + makeResultMessage, + type PluginExecuteConfig, + type SQLTypeMarker, + type StreamExecutionSettings, } from "shared"; import { SQLWarehouseConnector } from "../../connectors"; import { getWarehouseId, getWorkspaceClient } from "../../context"; +import { ExecutionError } from "../../errors"; import { createLogger } from "../../logging/logger"; import { Plugin, toPlugin } from "../../plugin"; import type { PluginManifest } from "../../registry"; @@ -15,6 +20,7 @@ import { queryDefaults } from "./defaults"; import manifest from "./manifest.json"; import { QueryProcessor } from "./query"; import type { + AnalyticsFormat, AnalyticsQueryResponse, IAnalyticsConfig, IAnalyticsQueryRequest, @@ -115,7 +121,15 @@ export class AnalyticsPlugin extends Plugin { res: express.Response, ): Promise { const { query_key } = req.params; - const { parameters, format = "JSON_ARRAY" } = req.body as IAnalyticsQueryRequest; + const { parameters, format = "JSON_ARRAY" } = + req.body as IAnalyticsQueryRequest; + + if (format !== "JSON_ARRAY" && format !== "ARROW_STREAM") { + res.status(400).json({ + error: `Invalid format: ${String(format)}. Expected "JSON_ARRAY" or "ARROW_STREAM".`, + }); + return; + } // Request-scoped logging with WideEvent tracking logger.debug(req, "Executing query: %s (format=%s)", query_key, format); @@ -150,34 +164,33 @@ export class AnalyticsPlugin extends Plugin { const executor = isAsUser ? this.asUser(req) : this; const executorKey = isAsUser ? this.resolveUserId(req) : "global"; - const queryParameters = - format === "ARROW_STREAM" - ? { - formatParameters: { - disposition: "EXTERNAL_LINKS", - format: "ARROW_STREAM", - }, - type: "arrow", - } - : { - type: "result", - }; - const hashedQuery = this.queryProcessor.hashQuery(query); + // ARROW_STREAM may resolve to EXTERNAL_LINKS, which returns pre-signed URLs + // that typically expire ~15 minutes after issue. Cap the cache TTL well + // under that for ARROW_STREAM so we never hand out dead URLs from cache, + // while still benefiting from caching INLINE attachment responses (and + // EXTERNAL_LINKS responses inside their valid window). + const cacheTtl = + format === "ARROW_STREAM" + ? Math.min(queryDefaults.cache?.ttl ?? 600, 600) + : queryDefaults.cache?.ttl; + const cacheConfig = { + ...queryDefaults.cache, + ttl: cacheTtl, + cacheKey: [ + "analytics:query", + query_key, + JSON.stringify(parameters), + format, + hashedQuery, + executorKey, + ], + }; + const defaultConfig: PluginExecuteConfig = { ...queryDefaults, - cache: { - ...queryDefaults.cache, - cacheKey: [ - "analytics:query", - query_key, - JSON.stringify(parameters), - JSON.stringify(format), - hashedQuery, - executorKey, - ], - }, + cache: cacheConfig, }; const streamExecutionSettings: StreamExecutionSettings = { @@ -192,20 +205,94 @@ export class AnalyticsPlugin extends Plugin { parameters, ); - const result = await executor.query( + return this._executeWithFormatFallback( + executor, query, processedParams, - queryParameters.formatParameters, + format, signal, ); - - return { type: queryParameters.type, ...result }; }, streamExecutionSettings, executorKey, ); } + /** + * Execute a query with automatic disposition fallback for ARROW_STREAM. + * + * - JSON_ARRAY: always uses INLINE disposition, no fallback. + * - ARROW_STREAM: tries INLINE first, falls back to EXTERNAL_LINKS. + * This handles warehouses that only support one disposition. + */ + private async _executeWithFormatFallback( + executor: AnalyticsPlugin, + query: string, + processedParams: + | Record + | undefined, + requestedFormat: AnalyticsFormat, + signal?: AbortSignal, + ): Promise { + if (requestedFormat === "JSON_ARRAY") { + const result = await executor.query( + query, + processedParams, + { disposition: "INLINE", format: "JSON_ARRAY" }, + signal, + ); + return makeResultMessage(result?.data, { + status: result?.status, + statement_id: result?.statement_id, + }); + } + + // ARROW_STREAM: try INLINE first, fall back to EXTERNAL_LINKS. + try { + const result = await executor.query( + query, + processedParams, + { disposition: "INLINE", format: "ARROW_STREAM" }, + signal, + ); + // INLINE responses with an Arrow IPC attachment are forwarded as base64 + // for the client to decode into an Arrow Table. Anything else (rare: + // data_array under ARROW_STREAM, or an empty result) falls back to the + // generic "result" payload. + if (result?.attachment) { + return makeArrowInlineMessage(result.attachment); + } + return makeResultMessage(result?.data, { + status: result?.status, + statement_id: result?.statement_id, + }); + } catch (err: unknown) { + // If the request was aborted, do not retry — the signal is dead and + // a second statement would be billed but never read. + if (signal?.aborted) { + throw err; + } + + if (!_isInlineArrowUnsupported(err)) { + throw err; + } + + const msg = err instanceof Error ? err.message : String(err); + logger.warn( + "ARROW_STREAM INLINE rejected by warehouse, falling back to EXTERNAL_LINKS: %s", + msg, + ); + } + + const result = await executor.query( + query, + processedParams, + { disposition: "EXTERNAL_LINKS", format: "ARROW_STREAM" }, + signal, + ); + return makeArrowMessage(result.statement_id, { status: result.status }); + } + /** * Execute a SQL query using the current execution context. * @@ -276,6 +363,48 @@ export class AnalyticsPlugin extends Plugin { } } +/** + * Determine whether a warehouse error indicates that ARROW_STREAM + INLINE + * is unsupported, vs an unrelated SQL/permission error. + * + * Preferred path: read the structured `errorCode` we now propagate from the + * SDK's `ApiError.errorCode` and the warehouse's `status.error.error_code` + * through `ExecutionError`. This is stable across error-message wording + * changes. + * + * Substring backstop: if the upstream error didn't surface a code (legacy + * SDK builds, or errors thrown outside the connector's wrap path), fall + * back to requiring both INLINE and ARROW_STREAM keywords in the message + * plus a marker phrase. The pair-requirement avoids matching unrelated SQL + * errors that happen to mention one of the words (e.g. a column named + * `INLINE_USERS`). + */ +function _isInlineArrowUnsupported(err: unknown): boolean { + const structuredCode = + err instanceof ExecutionError ? err.errorCode : undefined; + if ( + structuredCode === "INVALID_PARAMETER_VALUE" || + structuredCode === "NOT_IMPLEMENTED" + ) { + // Structured code already tells us the warehouse rejected the request. + // Require keyword pairing to confirm it's the disposition/format combo + // (vs an INVALID_PARAMETER_VALUE for something else entirely). + const msg = err instanceof Error ? err.message : String(err); + return msg.includes("INLINE") && msg.includes("ARROW_STREAM"); + } + + // Backstop for errors without a structured code. + const msg = err instanceof Error ? err.message : String(err); + if (!msg.includes("INLINE") || !msg.includes("ARROW_STREAM")) { + return false; + } + return ( + msg.includes("not supported") || + msg.includes("INVALID_PARAMETER_VALUE") || + msg.includes("NOT_IMPLEMENTED") + ); +} + /** * @internal */ diff --git a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts index 9a30440ed..6e6886afd 100644 --- a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts +++ b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts @@ -584,6 +584,455 @@ describe("Analytics Plugin", () => { ); }); + test("/query/:query_key should pass INLINE + ARROW_STREAM format parameters when format is ARROW_STREAM", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + statement: "SELECT * FROM test", + warehouse_id: "test-warehouse-id", + disposition: "INLINE", + format: "ARROW_STREAM", + }), + expect.any(AbortSignal), + ); + }); + + test("/query/:query_key should use INLINE + JSON_ARRAY by default when no format specified", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {} }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + disposition: "INLINE", + format: "JSON_ARRAY", + }), + expect.any(AbortSignal), + ); + }); + + test("/query/:query_key should pass INLINE + JSON_ARRAY when format is explicitly JSON_ARRAY", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "JSON_ARRAY" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock.mock.calls[0][1]).toMatchObject({ + disposition: "INLINE", + format: "JSON_ARRAY", + }); + }); + + test("/query/:query_key should fall back ARROW_STREAM from INLINE to EXTERNAL_LINKS when warehouse rejects INLINE", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValueOnce( + new Error( + "INVALID_PARAMETER_VALUE: ARROW_STREAM not supported with INLINE disposition", + ), + ) + .mockResolvedValueOnce({ + result: { statement_id: "stmt-1", status: { state: "SUCCEEDED" } }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // First call: INLINE (rejected) + expect(executeMock.mock.calls[0][1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + // Second call: EXTERNAL_LINKS (fallback) + expect(executeMock.mock.calls[1][1]).toMatchObject({ + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }); + }); + + test("/query/:query_key falls back on a structured ExecutionError.errorCode without scanning the message", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + // Properly-structured ExecutionError, as the connector now produces + // when the SDK's ApiError surfaces with errorCode set. + const { ExecutionError } = await import("../../../errors/execution"); + const structuredError = ExecutionError.statementFailed( + "ARROW_STREAM is not supported with INLINE disposition", + "INVALID_PARAMETER_VALUE", + ); + + const executeMock = vi + .fn() + .mockRejectedValueOnce(structuredError) + .mockResolvedValueOnce({ + result: { statement_id: "stmt-1", status: { state: "SUCCEEDED" } }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // Both attempts: INLINE (rejected via structured code) → EXTERNAL_LINKS. + expect(executeMock).toHaveBeenCalledTimes(2); + expect(executeMock.mock.calls[1][1]).toMatchObject({ + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }); + }); + + test("/query/:query_key falls back when error message carries a structured INVALID_PARAMETER_VALUE error_code", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + // Wrapped JSON error like the SDK surfaces from a `Bad Request` HTTP + // response. Both INLINE and ARROW_STREAM appear, plus the structured code. + const wrappedJsonError = new Error( + 'Response from server (Bad Request) {"error_code":"INVALID_PARAMETER_VALUE","message":"ARROW_STREAM is not supported with INLINE disposition on this warehouse"}', + ); + const executeMock = vi + .fn() + .mockRejectedValueOnce(wrappedJsonError) + .mockResolvedValueOnce({ + result: { statement_id: "stmt-1", status: { state: "SUCCEEDED" } }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // Both attempts ran: INLINE (rejected) then EXTERNAL_LINKS (succeeded). + expect(executeMock).toHaveBeenCalledTimes(2); + expect(executeMock.mock.calls[1][1]).toMatchObject({ + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }); + }); + + test("/query/:query_key does NOT fall back when only one of INLINE/ARROW_STREAM appears in the error", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + // Realistic non-format error that mentions just one of the keywords — + // e.g. an unrelated INVALID_PARAMETER_VALUE about a different param. + const executeMock = vi + .fn() + .mockRejectedValue( + new Error( + 'Response from server (Bad Request) {"error_code":"INVALID_PARAMETER_VALUE","message":"INLINE is not a valid value for parameter `mode`"}', + ), + ); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // The retry interceptor may attempt the query multiple times, but the + // analytics plugin must never escalate to EXTERNAL_LINKS for an error + // that doesn't actually indicate a format/disposition rejection. + for (const call of executeMock.mock.calls) { + expect(call[1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + } + }); + + test("/query/:query_key should not fall back for non-format errors", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValue(new Error("PERMISSION_DENIED: no access")); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // Only one call — non-format error is not retried with different disposition. + for (const call of executeMock.mock.calls) { + expect(call[1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + } + }); + + test("/query/:query_key emits arrow_inline SSE event when ARROW_STREAM INLINE returns an attachment", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const fakeAttachment = "BASE64_ARROW_IPC_BYTES"; + const executeMock = vi.fn().mockResolvedValue({ + result: { attachment: fakeAttachment, row_count: 1 }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // The route should not fall back to EXTERNAL_LINKS — INLINE succeeded. + expect(executeMock).toHaveBeenCalledTimes(1); + expect(executeMock.mock.calls[0][1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + // SSE payload should use the new arrow_inline message type. + const writeCalls = (mockRes.write as any).mock.calls.map( + (c: any[]) => c[0] as string, + ); + const payload = writeCalls.find((s: string) => s.startsWith("data: ")); + expect(payload).toBeDefined(); + expect(payload).toContain('"type":"arrow_inline"'); + expect(payload).toContain(`"attachment":"${fakeAttachment}"`); + }); + + test("/query/:query_key rejects unknown format values with 400", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + const executeMock = vi.fn(); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "JSON" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(400); + expect(executeMock).not.toHaveBeenCalled(); + }); + + test("/query/:query_key does not retry the fallback when the request was aborted", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockImplementation((_wc, _opts, signal) => { + // Simulate a signal that becomes aborted before the failure surfaces — + // e.g. the client cancelled the SSE stream mid-query. Use vitest's + // getter spy rather than Object.defineProperty so we don't try to + // override the native non-configurable AbortSignal.aborted getter. + if (signal) { + vi.spyOn(signal, "aborted", "get").mockReturnValue(true); + } + return Promise.reject( + new Error( + "INVALID_PARAMETER_VALUE: ARROW_STREAM not supported with INLINE disposition", + ), + ); + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // Even though the error message would normally trigger fallback, the + // aborted signal should short-circuit and prevent a second statement. + expect(executeMock).toHaveBeenCalledTimes(1); + }); + + test("/query/:query_key should not fall back when format is explicitly JSON_ARRAY", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValue( + new Error("INVALID_PARAMETER_VALUE: only supports ARROW_STREAM"), + ); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "JSON_ARRAY" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // All calls use JSON_ARRAY + INLINE — explicit JSON_ARRAY, no fallback. + for (const call of executeMock.mock.calls) { + expect(call[1]).toMatchObject({ + disposition: "INLINE", + format: "JSON_ARRAY", + }); + } + }); + test("should return 404 when query file is not found", async () => { const plugin = new AnalyticsPlugin(config); const { router, getHandler } = createMockRouter(); diff --git a/packages/appkit/src/stream/defaults.ts b/packages/appkit/src/stream/defaults.ts index c8fc91591..9212ebca3 100644 --- a/packages/appkit/src/stream/defaults.ts +++ b/packages/appkit/src/stream/defaults.ts @@ -1,6 +1,12 @@ export const streamDefaults = { bufferSize: 100, - maxEventSize: 1024 * 1024, // 1MB + // 8 MiB. Sized to fit base64-encoded inline Arrow IPC attachments from + // serverless warehouses (analytics queries typically return well under 1 MiB, + // but ARROW_STREAM + INLINE can carry up to ~25 MiB per the Databricks API). + // The connector enforces the same cap (`MAX_INLINE_ATTACHMENT_BYTES`) so + // anything that would exceed this fails fast at the connector with a clear + // error rather than a confusing SSE buffer-exceeded. + maxEventSize: 8 * 1024 * 1024, bufferTTL: 10 * 60 * 1000, // 10 minutes cleanupInterval: 5 * 60 * 1000, // 5 minutes maxPersistentBuffers: 10000, // 10000 buffers diff --git a/packages/appkit/src/type-generator/query-registry.ts b/packages/appkit/src/type-generator/query-registry.ts index 196690c2d..63c531d15 100644 --- a/packages/appkit/src/type-generator/query-registry.ts +++ b/packages/appkit/src/type-generator/query-registry.ts @@ -1,6 +1,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import { WorkspaceClient } from "@databricks/sdk-experimental"; +import { tableFromIPC } from "apache-arrow"; import pc from "picocolors"; import { createLogger } from "../logging/logger"; import { CACHE_VERSION, hashSQL, loadCache, saveCache } from "./cache"; @@ -129,18 +130,69 @@ function formatParametersType(sql: string): string { : "Record"; } +/** + * Decode a base64 Arrow IPC attachment from a DESCRIBE QUERY response and + * extract column metadata. Returns the same shape as rows parsed from the + * legacy data_array path. + * + * IMPORTANT: a DESCRIBE QUERY response is itself a result *table* with rows + * shaped like `(col_name, data_type, comment)` describing the user query's + * output schema. We must read those rows — NOT `table.schema.fields`, which + * would describe DESCRIBE QUERY's own output (`col_name`, `data_type`, + * `comment`) and yield bogus types for every query. + */ +function columnsFromArrowAttachment( + attachment: string, +): Array<{ name: string; type_name: string; comment: string | undefined }> { + const buf = Buffer.from(attachment, "base64"); + const table = tableFromIPC(buf); + return table.toArray().map((row) => { + const obj = row.toJSON() as { + col_name?: unknown; + data_type?: unknown; + comment?: unknown; + }; + return { + name: typeof obj.col_name === "string" ? obj.col_name : "", + type_name: + typeof obj.data_type === "string" + ? obj.data_type.toUpperCase() + : "STRING", + comment: + typeof obj.comment === "string" && obj.comment !== "" + ? obj.comment + : undefined, + }; + }); +} + export function convertToQueryType( result: DatabricksStatementExecutionResponse, sql: string, queryName: string, ): { type: string; hasResults: boolean } { const dataRows = result.result?.data_array || []; - const columns = dataRows.map((row) => ({ + let columns = dataRows.map((row) => ({ name: row[0] || "", type_name: row[1]?.toUpperCase() || "STRING", comment: row[2] || undefined, })); + // Fallback: serverless warehouses return ARROW_STREAM format with an inline + // base64 attachment instead of data_array. Decode the Arrow IPC rows (the + // DESCRIBE QUERY result table) to extract column names and types. + if (columns.length === 0 && result.result?.attachment) { + logger.debug("data_array empty, decoding Arrow IPC attachment for schema"); + try { + columns = columnsFromArrowAttachment(result.result.attachment); + } catch (err) { + logger.warn( + "Failed to decode Arrow IPC attachment: %s", + err instanceof Error ? err.message : String(err), + ); + } + } + const paramsType = formatParametersType(sql); // generate result fields with JSDoc @@ -386,10 +438,42 @@ export async function generateQueriesFromDescribe( sqlHash, cleanedSql, }: (typeof uncachedQueries)[number]): Promise => { - const result = (await client.statementExecution.executeStatement({ - statement: `DESCRIBE QUERY ${cleanedSql}`, - warehouse_id: warehouseId, - })) as DatabricksStatementExecutionResponse; + // Prefer JSON_ARRAY + INLINE so `data_array` parsing works directly. + // Some serverless warehouses reject this combination — fall back to + // ARROW_STREAM + INLINE (still inline, just a different format) and + // let `convertToQueryType` decode the inline attachment. Forcing + // INLINE on the retry avoids EXTERNAL_LINKS, which would silently + // produce empty `data_array` and degrade types to `unknown`. + let result: DatabricksStatementExecutionResponse; + try { + result = (await client.statementExecution.executeStatement({ + statement: `DESCRIBE QUERY ${cleanedSql}`, + warehouse_id: warehouseId, + format: "JSON_ARRAY", + disposition: "INLINE", + })) as DatabricksStatementExecutionResponse; + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + const looksLikeFormatRejection = + msg.includes("JSON_ARRAY") && + (msg.includes("not supported") || + msg.includes("INVALID_PARAMETER_VALUE") || + msg.includes("NOT_IMPLEMENTED")); + if (looksLikeFormatRejection) { + logger.debug( + "Warehouse rejected JSON_ARRAY+INLINE for %s, retrying with ARROW_STREAM+INLINE", + queryName, + ); + result = (await client.statementExecution.executeStatement({ + statement: `DESCRIBE QUERY ${cleanedSql}`, + warehouse_id: warehouseId, + format: "ARROW_STREAM", + disposition: "INLINE", + })) as DatabricksStatementExecutionResponse; + } else { + throw err; + } + } completed++; spinner.update( @@ -397,10 +481,11 @@ export async function generateQueriesFromDescribe( ); logger.debug( - "DESCRIBE result for %s: state=%s, rows=%d", + "DESCRIBE result for %s: state=%s, rows=%d, hasAttachment=%s", queryName, result.status.state, result.result?.data_array?.length ?? 0, + !!result.result?.attachment, ); if (result.status.state === "FAILED") { diff --git a/packages/appkit/src/type-generator/tests/query-registry.test.ts b/packages/appkit/src/type-generator/tests/query-registry.test.ts index 8d46f98e9..63a5636b9 100644 --- a/packages/appkit/src/type-generator/tests/query-registry.test.ts +++ b/packages/appkit/src/type-generator/tests/query-registry.test.ts @@ -1,4 +1,24 @@ -import { describe, expect, test } from "vitest"; +import { Table, tableToIPC, vectorFromArray } from "apache-arrow"; +import { describe, expect, test, vi } from "vitest"; + +const { mockLoggerWarn, mockLoggerDebug } = vi.hoisted(() => ({ + mockLoggerWarn: vi.fn(), + mockLoggerDebug: vi.fn(), +})); +vi.mock("../../logging/logger", () => ({ + createLogger: vi.fn(() => ({ + debug: mockLoggerDebug, + info: vi.fn(), + warn: mockLoggerWarn, + error: vi.fn(), + event: vi.fn(() => ({ + set: vi.fn().mockReturnThis(), + setComponent: vi.fn().mockReturnThis(), + setContext: vi.fn().mockReturnThis(), + })), + })), +})); + import { convertToQueryType, defaultForType, @@ -11,6 +31,20 @@ import { } from "../query-registry"; import type { DatabricksStatementExecutionResponse } from "../types"; +// Build a base64 Arrow IPC payload that mimics a DESCRIBE QUERY response — +// a result *table* with columns (col_name, data_type, comment) describing +// the user query's output schema. +function describeQueryAttachment( + rows: Array<{ col_name: string; data_type: string; comment: string | null }>, +): string { + const table = new Table({ + col_name: vectorFromArray(rows.map((r) => r.col_name)), + data_type: vectorFromArray(rows.map((r) => r.data_type)), + comment: vectorFromArray(rows.map((r) => r.comment ?? "")), + }); + return Buffer.from(tableToIPC(table, "stream")).toString("base64"); +} + describe("normalizeTypeName", () => { test("returns simple types unchanged", () => { expect(normalizeTypeName("STRING")).toBe("STRING"); @@ -346,6 +380,107 @@ SELECT * FROM users WHERE date = :startDate AND count = :count AND name = :name` ); expect(hasResults).toBe(false); }); + + describe("ARROW_STREAM attachment fallback (serverless warehouses)", () => { + test("decodes column metadata from Arrow IPC data rows, not schema fields", () => { + // Critical regression test: it would be a bug to read + // `table.schema.fields` here, which would generate types like + // { col_name: string; data_type: string; comment: string } for every + // query (those are DESCRIBE QUERY's own output columns). We must read + // the data rows. + const attachment = describeQueryAttachment([ + { col_name: "user_id", data_type: "BIGINT", comment: null }, + { col_name: "name", data_type: "STRING", comment: "display name" }, + { col_name: "active", data_type: "BOOLEAN", comment: null }, + ]); + const response: DatabricksStatementExecutionResponse = { + statement_id: "test-arrow", + status: { state: "SUCCEEDED" }, + result: { attachment }, + }; + + const { type, hasResults } = convertToQueryType( + response, + "SELECT user_id, name, active FROM users", + "users", + ); + + expect(hasResults).toBe(true); + // Real query columns appear in the generated type: + expect(type).toContain("user_id: number"); + expect(type).toContain("name: string"); + expect(type).toContain("active: boolean"); + // Column comments survive: + expect(type).toContain("/** display name"); + // The DESCRIBE QUERY metadata column names must NOT leak as user types: + expect(type).not.toContain("col_name: string"); + expect(type).not.toContain("data_type: string"); + }); + + test("normalizes lowercase data_type values to uppercase", () => { + const attachment = describeQueryAttachment([ + { col_name: "id", data_type: "int", comment: null }, + ]); + const response: DatabricksStatementExecutionResponse = { + statement_id: "test-arrow", + status: { state: "SUCCEEDED" }, + result: { attachment }, + }; + + const { type } = convertToQueryType(response, "SELECT 1", "test"); + expect(type).toContain("@sqlType INT"); + expect(type).toContain("id: number"); + }); + + test("prefers data_array over attachment when both are present", () => { + const attachment = describeQueryAttachment([ + { col_name: "from_arrow", data_type: "STRING", comment: null }, + ]); + const response: DatabricksStatementExecutionResponse = { + statement_id: "test-both", + status: { state: "SUCCEEDED" }, + result: { + data_array: [["from_data_array", "INT", null]], + attachment, + }, + }; + + const { type } = convertToQueryType(response, "SELECT 1", "test"); + expect(type).toContain("from_data_array: number"); + expect(type).not.toContain("from_arrow"); + }); + + test("logs a warning and yields the unknown-result fallback on malformed attachment", () => { + mockLoggerWarn.mockClear(); + const response: DatabricksStatementExecutionResponse = { + statement_id: "test-bad", + status: { state: "SUCCEEDED" }, + result: { attachment: "not-valid-arrow-ipc" }, + }; + + const { hasResults, type } = convertToQueryType( + response, + "SELECT 1", + "test", + ); + + // No columns extracted → unknown-result type, hasResults false. + expect(hasResults).toBe(false); + expect(type).toContain("unknown"); + // None of DESCRIBE QUERY's metadata column names should leak in as + // user-facing type fields — that would mean the parser swallowed + // the failure and produced bogus columns instead. + expect(type).not.toContain("col_name"); + expect(type).not.toContain("data_type"); + + // The warning must fire so a regression that silently produces empty + // types (no telemetry signal) fails this test. + expect(mockLoggerWarn).toHaveBeenCalledWith( + expect.stringContaining("Failed to decode Arrow IPC attachment"), + expect.any(String), + ); + }); + }); }); describe("inferParameterTypes", () => { diff --git a/packages/appkit/src/type-generator/types.ts b/packages/appkit/src/type-generator/types.ts index 5af43591a..9a591f512 100644 --- a/packages/appkit/src/type-generator/types.ts +++ b/packages/appkit/src/type-generator/types.ts @@ -12,6 +12,8 @@ export interface DatabricksStatementExecutionResponse { }; result?: { data_array?: (string | null)[][]; + /** Base64-encoded Arrow IPC bytes (returned by serverless warehouses using ARROW_STREAM format) */ + attachment?: string; }; } diff --git a/packages/shared/package.json b/packages/shared/package.json index 27d268ca3..bff3a542c 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -40,6 +40,7 @@ "ajv": "8.17.1", "ajv-formats": "3.0.1", "@clack/prompts": "1.0.1", - "commander": "12.1.0" + "commander": "12.1.0", + "zod": "3.23.8" } } diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 627d70d6c..d18740bb0 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -3,4 +3,5 @@ export * from "./execute"; export * from "./genie"; export * from "./plugin"; export * from "./sql"; +export * from "./sse/analytics"; export * from "./tunnel"; diff --git a/packages/shared/src/sse/analytics.test.ts b/packages/shared/src/sse/analytics.test.ts new file mode 100644 index 000000000..5abeb83e0 --- /dev/null +++ b/packages/shared/src/sse/analytics.test.ts @@ -0,0 +1,87 @@ +import { describe, expect, test } from "vitest"; +import { + AnalyticsSseMessage, + makeArrowInlineMessage, + makeArrowMessage, + makeResultMessage, +} from "./analytics"; + +describe("AnalyticsSseMessage schema", () => { + test("accepts a result message with rows", () => { + const parsed = AnalyticsSseMessage.parse({ + type: "result", + data: [{ id: 1, name: "alice" }], + }); + expect(parsed.type).toBe("result"); + }); + + test("accepts a result message with no data (empty result)", () => { + expect(() => AnalyticsSseMessage.parse({ type: "result" })).not.toThrow(); + }); + + test("accepts an arrow message with statement_id", () => { + const parsed = AnalyticsSseMessage.parse({ + type: "arrow", + statement_id: "stmt-1", + }); + expect(parsed.type).toBe("arrow"); + }); + + test("rejects an arrow message with empty statement_id", () => { + expect(() => + AnalyticsSseMessage.parse({ type: "arrow", statement_id: "" }), + ).toThrow(); + }); + + test("rejects an arrow message with no statement_id", () => { + expect(() => AnalyticsSseMessage.parse({ type: "arrow" })).toThrow(); + }); + + test("accepts an arrow_inline message with non-empty attachment", () => { + const parsed = AnalyticsSseMessage.parse({ + type: "arrow_inline", + attachment: "AQID", + }); + expect(parsed.type).toBe("arrow_inline"); + }); + + test("rejects an arrow_inline message with empty attachment", () => { + expect(() => + AnalyticsSseMessage.parse({ type: "arrow_inline", attachment: "" }), + ).toThrow(); + }); + + test("rejects an arrow_inline message with non-string attachment", () => { + expect(() => + AnalyticsSseMessage.parse({ type: "arrow_inline", attachment: 123 }), + ).toThrow(); + }); + + test("rejects an unknown type", () => { + expect(() => + AnalyticsSseMessage.parse({ type: "unknown_kind", foo: "bar" }), + ).toThrow(); + }); + + test("safeParse returns success: false for malformed payloads", () => { + const r = AnalyticsSseMessage.safeParse({ type: "arrow_inline" }); + expect(r.success).toBe(false); + }); +}); + +describe("typed builders", () => { + test("makeResultMessage roundtrips through the schema", () => { + const msg = makeResultMessage([{ id: 1 }], { statement_id: "s-1" }); + expect(() => AnalyticsSseMessage.parse(msg)).not.toThrow(); + }); + + test("makeArrowMessage roundtrips through the schema", () => { + const msg = makeArrowMessage("stmt-2"); + expect(() => AnalyticsSseMessage.parse(msg)).not.toThrow(); + }); + + test("makeArrowInlineMessage roundtrips through the schema", () => { + const msg = makeArrowInlineMessage("AQID"); + expect(() => AnalyticsSseMessage.parse(msg)).not.toThrow(); + }); +}); diff --git a/packages/shared/src/sse/analytics.ts b/packages/shared/src/sse/analytics.ts new file mode 100644 index 000000000..1f136af29 --- /dev/null +++ b/packages/shared/src/sse/analytics.ts @@ -0,0 +1,95 @@ +import { z } from "zod"; + +/** + * Wire protocol for analytics SSE messages emitted by `/api/analytics/query`. + * + * These schemas are the single source of truth for the contract between the + * server (`AnalyticsPlugin._handleQueryRoute`) and the client + * (`useAnalyticsQuery`). Both sides validate with the same schema: + * + * - Server uses the typed builders (`makeResultMessage`, `makeArrowMessage`, + * `makeArrowInlineMessage`) to construct messages with compile-time + * guarantees that all required fields are present. + * - Client calls `AnalyticsSseMessage.parse(JSON.parse(event.data))` to fail + * loudly on a malformed payload instead of silently treating an undefined + * field as data. + * + * Adding a new message variant requires a schema update here, which keeps + * server and client in lockstep. + */ + +/** Successful row-shaped result (JSON_ARRAY format, or empty results). */ +export const AnalyticsResultMessage = z.object({ + type: z.literal("result"), + // zod 4 requires both key and value type for z.record(); zod 3 took + // value only. Using the explicit two-arg form keeps the schema valid + // under whichever zod major resolves at install time. + data: z.array(z.record(z.string(), z.unknown())).optional(), + // Status is opaque metadata forwarded from the warehouse — keep it as + // `unknown` so we don't bake the SDK's detailed shape into the contract. + status: z.unknown().optional(), + statement_id: z.string().optional(), +}); +export type AnalyticsResultMessage = z.infer; + +/** + * ARROW_STREAM result delivered via /arrow-result/:jobId — used for + * EXTERNAL_LINKS responses (statement_id from the warehouse) and, if PR #320 + * lands, also for INLINE responses (synthetic `inline-` prefixed id from + * the server-side stash). + */ +export const AnalyticsArrowMessage = z.object({ + type: z.literal("arrow"), + statement_id: z.string().min(1), + status: z.unknown().optional(), +}); +export type AnalyticsArrowMessage = z.infer; + +/** + * ARROW_STREAM + INLINE result with the base64-encoded Arrow IPC bytes + * embedded in the SSE message. The client decodes locally via + * `ArrowClient.processArrowBuffer`. + * + * Note: this variant goes away if the proposal in PR #320 lands. + */ +export const AnalyticsArrowInlineMessage = z.object({ + type: z.literal("arrow_inline"), + attachment: z.string().min(1), +}); +export type AnalyticsArrowInlineMessage = z.infer< + typeof AnalyticsArrowInlineMessage +>; + +/** Discriminated union of every message the analytics SSE stream may emit. */ +export const AnalyticsSseMessage = z.discriminatedUnion("type", [ + AnalyticsResultMessage, + AnalyticsArrowMessage, + AnalyticsArrowInlineMessage, +]); +export type AnalyticsSseMessage = z.infer; + +// ──────────────────────────────────────────────────────────────────────────── +// Typed builders — call from the server route handler. The compiler enforces +// that every required field is supplied, and the return type narrows so +// downstream code (executeStream / SSE writer) keeps full type information. +// ──────────────────────────────────────────────────────────────────────────── + +export function makeResultMessage( + data: Record[] | undefined, + extras: { status?: unknown; statement_id?: string } = {}, +): AnalyticsResultMessage { + return { type: "result", data, ...extras }; +} + +export function makeArrowMessage( + statement_id: string, + extras: { status?: unknown } = {}, +): AnalyticsArrowMessage { + return { type: "arrow", statement_id, ...extras }; +} + +export function makeArrowInlineMessage( + attachment: string, +): AnalyticsArrowInlineMessage { + return { type: "arrow_inline", attachment }; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index fc6e389ef..da5f786a8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -299,6 +299,9 @@ importers: '@types/semver': specifier: 7.7.1 version: 7.7.1 + apache-arrow: + specifier: 21.1.0 + version: 21.1.0 dotenv: specifier: 16.6.1 version: 16.6.1 @@ -326,9 +329,6 @@ importers: ws: specifier: 8.18.3 version: 8.18.3(bufferutil@4.0.9) - zod: - specifier: 4.3.6 - version: 4.3.6 devDependencies: '@types/express': specifier: 4.17.25 @@ -542,6 +542,9 @@ importers: commander: specifier: 12.1.0 version: 12.1.0 + zod: + specifier: 3.23.8 + version: 3.23.8 devDependencies: '@types/express': specifier: 4.17.23 @@ -11920,12 +11923,12 @@ packages: peerDependencies: zod: ^3.25.0 || ^4.0.0 + zod@3.23.8: + resolution: {integrity: sha512-XBx9AXhXktjUqnepgTiE5flcKIYWi/rme0Eaj+5Y0lftuGBq+jyRu/md4WnuxqgP1ubdpNCsYEYPxrzVHD8d6g==} + zod@4.1.13: resolution: {integrity: sha512-AvvthqfqrAhNH9dnfmrfKzX5upOdjUVJYFqNSlkmGf64gRaTzlPwz99IHYnVs28qYAybvAlBV+H7pn0saFY4Ig==} - zod@4.3.6: - resolution: {integrity: sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==} - zrender@6.0.0: resolution: {integrity: sha512-41dFXEEXuJpNecuUQq6JlbybmnHaqqpGlbH1yxnA5V9MMP4SbohSVZsJIwz+zdjQXSSlR1Vc34EgH1zxyTDvhg==} @@ -11937,33 +11940,33 @@ packages: snapshots: - '@ai-sdk/gateway@2.0.21(zod@4.3.6)': + '@ai-sdk/gateway@2.0.21(zod@4.1.13)': dependencies: '@ai-sdk/provider': 2.0.0 - '@ai-sdk/provider-utils': 3.0.19(zod@4.3.6) + '@ai-sdk/provider-utils': 3.0.19(zod@4.1.13) '@vercel/oidc': 3.0.5 - zod: 4.3.6 + zod: 4.1.13 - '@ai-sdk/provider-utils@3.0.19(zod@4.3.6)': + '@ai-sdk/provider-utils@3.0.19(zod@4.1.13)': dependencies: '@ai-sdk/provider': 2.0.0 '@standard-schema/spec': 1.1.0 eventsource-parser: 3.0.6 - zod: 4.3.6 + zod: 4.1.13 '@ai-sdk/provider@2.0.0': dependencies: json-schema: 0.4.0 - '@ai-sdk/react@2.0.115(react@19.2.0)(zod@4.3.6)': + '@ai-sdk/react@2.0.115(react@19.2.0)(zod@4.1.13)': dependencies: - '@ai-sdk/provider-utils': 3.0.19(zod@4.3.6) - ai: 5.0.113(zod@4.3.6) + '@ai-sdk/provider-utils': 3.0.19(zod@4.1.13) + ai: 5.0.113(zod@4.1.13) react: 19.2.0 swr: 2.3.8(react@19.2.0) throttleit: 2.1.0 optionalDependencies: - zod: 4.3.6 + zod: 4.1.13 '@algolia/abtesting@1.12.0': dependencies: @@ -13624,14 +13627,14 @@ snapshots: '@docsearch/react@4.3.2(@algolia/client-search@5.46.0)(@types/react@19.2.7)(react-dom@19.2.0(react@19.2.0))(react@19.2.0)(search-insights@2.17.3)': dependencies: - '@ai-sdk/react': 2.0.115(react@19.2.0)(zod@4.3.6) + '@ai-sdk/react': 2.0.115(react@19.2.0)(zod@4.1.13) '@algolia/autocomplete-core': 1.19.2(@algolia/client-search@5.46.0)(algoliasearch@5.46.0)(search-insights@2.17.3) '@docsearch/core': 4.3.1(@types/react@19.2.7)(react-dom@19.2.0(react@19.2.0))(react@19.2.0) '@docsearch/css': 4.3.2 - ai: 5.0.113(zod@4.3.6) + ai: 5.0.113(zod@4.1.13) algoliasearch: 5.46.0 marked: 16.4.2 - zod: 4.3.6 + zod: 4.1.13 optionalDependencies: '@types/react': 19.2.7 react: 19.2.0 @@ -17778,13 +17781,13 @@ snapshots: clean-stack: 2.2.0 indent-string: 4.0.0 - ai@5.0.113(zod@4.3.6): + ai@5.0.113(zod@4.1.13): dependencies: - '@ai-sdk/gateway': 2.0.21(zod@4.3.6) + '@ai-sdk/gateway': 2.0.21(zod@4.1.13) '@ai-sdk/provider': 2.0.0 - '@ai-sdk/provider-utils': 3.0.19(zod@4.3.6) + '@ai-sdk/provider-utils': 3.0.19(zod@4.1.13) '@opentelemetry/api': 1.9.0 - zod: 4.3.6 + zod: 4.1.13 ajv-formats@2.1.1(ajv@8.17.1): optionalDependencies: @@ -21024,7 +21027,7 @@ snapshots: typescript: 5.9.3 unbash: 2.2.0 yaml: 2.8.2 - zod: 4.3.6 + zod: 4.1.13 langium@3.3.1: dependencies: @@ -25293,9 +25296,9 @@ snapshots: dependencies: zod: 4.1.13 - zod@4.1.13: {} + zod@3.23.8: {} - zod@4.3.6: {} + zod@4.1.13: {} zrender@6.0.0: dependencies: