Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8108dd5
feat(webapp): add a new backend for the realtime runs feed
ericallam Jun 8, 2026
13d2511
fix(webapp): harden the realtime runs backend
ericallam Jun 8, 2026
e066640
fix(webapp): enforce the realtime tag/batch result cap exactly
ericallam Jun 8, 2026
fdadc18
feat(webapp): give the realtime runs feed its own ClickHouse pool
ericallam Jun 8, 2026
5c42d55
fix(webapp): log realtime run-change pub/sub failures at error level
ericallam Jun 8, 2026
75848cf
feat(webapp): give the realtime runs feed its own pub/sub Redis
ericallam Jun 8, 2026
2b37946
fix(webapp): adapt the realtime run-id resolver to paginated listRunIds
ericallam Jun 8, 2026
57755b8
fix(webapp): JSON-encode the run-set cache key to avoid separator col…
ericallam Jun 8, 2026
fbbd783
perf(webapp): scale the realtime runs feed under high concurrency
ericallam Jun 9, 2026
2a1e927
refactor(webapp,run-engine): route the realtime runs feed through one…
ericallam Jun 9, 2026
3348cfe
fix(webapp): wake live feeds for mid-run metadata updates by their in…
ericallam Jun 9, 2026
3a45f3d
fix(webapp): stop concurrent batch subscribers sharing a realtime wor…
ericallam Jun 9, 2026
9f4c374
fix(webapp): catch change-routing failures instead of crashing the pr…
ericallam Jun 10, 2026
7c72cdf
fix(webapp): publish full realtime change records from the metadata a…
ericallam Jun 10, 2026
cab6b05
refactor(webapp): rename the realtime backend to native and tune it v…
ericallam Jun 10, 2026
77ac66a
fix(webapp): deliver realtime changes that land between long-polls im…
ericallam Jun 10, 2026
46a0cbb
fix(webapp): match multi-tag realtime subscriptions to contains-all s…
ericallam Jun 10, 2026
1b82875
feat(webapp): add delivery-lag and health metrics to the realtime bac…
ericallam Jun 10, 2026
337a4f0
fix(webapp): emit realtime backend metrics through OpenTelemetry
ericallam Jun 10, 2026
ff0e9ef
feat(webapp): add a local Grafana dashboard for the realtime backend …
ericallam Jun 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/realtime-runs-subscription-scalability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add a new backend for the realtime runs feed (single runs, tags, and batches) that scales under high concurrency, available behind a feature flag
2 changes: 1 addition & 1 deletion apps/supervisor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"@kubernetes/client-node": "^1.0.0",
"@trigger.dev/core": "workspace:*",
"dockerode": "^4.0.6",
"ioredis": "^5.3.2",
"ioredis": "~5.6.0",
"p-limit": "^6.2.0",
"prom-client": "^15.1.0",
"socket.io": "4.7.4",
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
registerRunEngineEventBusHandlers,
setupBatchQueueCallbacks,
} from "./v3/runEngineHandlers.server";
import { registerRunChangeNotifierHandlers } from "./services/realtime/runChangeNotifierHandlers.server";
// Touch the sessions replication singleton at entry so it boots deterministically
// on webapp startup. The singleton's initializer wires start (gated on
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
Expand Down Expand Up @@ -269,6 +270,9 @@ process.on("uncaughtException", (error, origin) => {

singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);
// Attach the realtime run-changed publish delegations to the engine event bus.
// No-ops (registers nothing) unless REALTIME_BACKEND_NATIVE_ENABLED=1.
singleton("RunChangeNotifierHandlers", registerRunChangeNotifierHandlers);

// Wrapped in singleton() so Remix's dev-mode CJS reloads don't append
// duplicate copies of the processor — Sentry's processor list lives in
Expand Down
79 changes: 79 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,43 @@ const EnvironmentSchema = z
.int()
.default(24 * 60 * 60 * 1000), // 1 day in milliseconds

// Master switch for the native realtime backend; off = Electric serves everything, publishes no-op.
REALTIME_BACKEND_NATIVE_ENABLED: z.string().default("0"),
// Live long-poll backstop hold (ms); matches Electric's ~20s cadence.
REALTIME_BACKEND_NATIVE_LIVE_POLL_TIMEOUT_MS: z.coerce.number().int().default(20_000),
// Jitter ratio on the live-poll hold (0.15 = ±15%) to avoid synchronized refetch herds.
REALTIME_BACKEND_NATIVE_LIVE_POLL_JITTER_RATIO: z.coerce.number().default(0.15),
// Hard cap on the tag-list snapshot size.
REALTIME_BACKEND_NATIVE_MAX_LIST_RESULTS: z.coerce.number().int().default(1_000),
// TTL/size of the coalescing cache for the multi-run resolve+hydrate (same-filter feeds share one query).
REALTIME_BACKEND_NATIVE_RUNSET_CACHE_TTL_MS: z.coerce.number().int().default(1_000),
REALTIME_BACKEND_NATIVE_RUNSET_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
// Size/TTL of the per-handle working-set cache used to diff multi-run live polls.
REALTIME_BACKEND_NATIVE_WORKING_SET_MAX_ENTRIES: z.coerce.number().int().default(10_000),
REALTIME_BACKEND_NATIVE_WORKING_SET_TTL_MS: z.coerce.number().int().default(300_000),
// Bucket (ms) the tag-list createdAt floor is quantized to so same-tag feeds share a cache entry; 0 disables.
REALTIME_BACKEND_NATIVE_RUNSET_CREATED_AT_BUCKET_MS: z.coerce.number().int().default(60_000),
// Leading-edge throttle (ms) on per-env wake delivery; 0 wakes on every change.
REALTIME_BACKEND_NATIVE_ENV_WAKE_COALESCE_WINDOW_MS: z.coerce.number().int().default(100),
// "1" holds a multi-run live poll open on a non-matching wake instead of replying up-to-date.
REALTIME_BACKEND_NATIVE_HOLD_ON_EMPTY: z.string().default("1"),
// Max concurrent fresh ClickHouse resolves per instance (reconnect-stampede gate); 0 disables.
REALTIME_BACKEND_NATIVE_RESOLVE_ADMISSION_LIMIT: z.coerce.number().int().default(16),
// Replay window (ms) for buffered change records delivered to newly-armed feeds; 0 disables.
REALTIME_BACKEND_NATIVE_REPLAY_WINDOW_MS: z.coerce.number().int().default(2_000),
// Cap on buffered recent records per env (latest record per run).
REALTIME_BACKEND_NATIVE_REPLAY_MAX_RUNS: z.coerce.number().int().default(512),
// Keep an env subscribed + buffering this long (ms) after its last feed closes; 0 disables.
REALTIME_BACKEND_NATIVE_UNSUBSCRIBE_LINGER_MS: z.coerce.number().int().default(5_000),
// Fallback per-env concurrent-connection limit when the org has none configured.
REALTIME_BACKEND_NATIVE_DEFAULT_CONCURRENCY_LIMIT: z.coerce.number().int().default(100_000),
// TTL/size of the single-run read-through cache that collapses duplicate refetch bursts.
REALTIME_BACKEND_NATIVE_RUN_CACHE_TTL_MS: z.coerce.number().int().default(250),
REALTIME_BACKEND_NATIVE_RUN_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
// TTL/size of the per-org realtimeBackend flag cache used to pick the serving backend.
REALTIME_BACKEND_FLAG_CACHE_TTL_MS: z.coerce.number().int().default(30_000),
REALTIME_BACKEND_FLAG_CACHE_MAX_ENTRIES: z.coerce.number().int().default(50_000),

PUBSUB_REDIS_HOST: z
.string()
.optional()
Expand Down Expand Up @@ -332,6 +369,36 @@ const EnvironmentSchema = z
PUBSUB_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

// Dedicated pub/sub Redis for the native realtime backend; falls back to PUBSUB_REDIS_* then REDIS_*.
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.PUBSUB_REDIS_HOST ?? process.env.REDIS_HOST),
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => {
if (v !== undefined) return v;
const raw = process.env.PUBSUB_REDIS_PORT ?? process.env.REDIS_PORT;
return raw ? parseInt(raw) : undefined;
}),
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.PUBSUB_REDIS_USERNAME ?? process.env.REDIS_USERNAME),
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.PUBSUB_REDIS_PASSWORD ?? process.env.REDIS_PASSWORD),
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_TLS_DISABLED: z
.string()
.default(process.env.PUBSUB_REDIS_TLS_DISABLED ?? process.env.REDIS_TLS_DISABLED ?? "false"),
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z
.string()
.default(process.env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED ?? "0"),
// Use sharded pub/sub (SSUBSCRIBE/SPUBLISH) in cluster mode; "0" forces classic pub/sub.
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_SHARDED_ENABLED: z.string().default("1"),

DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
DEFAULT_ENV_EXECUTION_CONCURRENCY_BURST_FACTOR: z.coerce.number().default(1.0),
DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(300),
Expand Down Expand Up @@ -1608,6 +1675,18 @@ const EnvironmentSchema = z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
// Dedicated ClickHouse pool for the native backend's tag/batch id resolution; falls back to CLICKHOUSE_URL.
REALTIME_BACKEND_NATIVE_CLICKHOUSE_URL: z
.string()
.optional()
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
REALTIME_BACKEND_NATIVE_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
REALTIME_BACKEND_NATIVE_CLICKHOUSE_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
REALTIME_BACKEND_NATIVE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
METRICS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(10000),
Expand Down
21 changes: 19 additions & 2 deletions apps/webapp/app/models/runtimeEnvironment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,20 @@ export async function findEnvironmentBySlug(
return environment ? toAuthenticated(environment) : null;
}

// The authenticated environment plus the run scalars the realtime publish needs.
// Both come from one taskRun read — see findEnvironmentFromRun.
export type EnvironmentFromRun = {
environment: AuthenticatedEnvironment;
runTags: string[];
batchId: string | null;
};

export async function findEnvironmentFromRun(
runId: string,
tx?: PrismaClientOrTransaction
): Promise<AuthenticatedEnvironment | null> {
): Promise<EnvironmentFromRun | null> {
// The include (no select) already pulls every taskRun scalar, so runTags/batchId
// ride along for free — no extra query for the realtime publish to send a full record.
const taskRun = await (tx ?? $replica).taskRun.findFirst({
where: {
id: runId,
Expand All @@ -249,7 +259,14 @@ export async function findEnvironmentFromRun(
runtimeEnvironment: { include: authIncludeBase },
},
});
return taskRun?.runtimeEnvironment ? toAuthenticated(taskRun.runtimeEnvironment) : null;
if (!taskRun?.runtimeEnvironment) {
return null;
}
return {
environment: toAuthenticated(taskRun.runtimeEnvironment),
runTags: taskRun.runTags,
batchId: taskRun.batchId,
};
}

export async function createNewSession(
Expand Down
11 changes: 10 additions & 1 deletion apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { ServiceValidationError } from "~/v3/services/common.server";
import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server";
Expand Down Expand Up @@ -184,7 +185,15 @@ const { action } = createActionApiRoute(
return json({ error: "Internal Server Error" }, { status: 500 });
}
if (pgResult) {
return json(pgResult, { status: 200 });
// Reflect metadata.set() on a live feed before the next lifecycle event. Publish the
// internal id (the router keys single-run feeds by it, not the friendly id from the URL).
publishChangeRecord({
runId: pgResult.runId,
envId: env.id,
tags: pgResult.runTags,
batchId: pgResult.batchId,
});
return json({ metadata: pgResult.metadata }, { status: 200 });
}

// PG miss. Target run is either buffered or genuinely absent.
Expand Down
9 changes: 9 additions & 0 deletions apps/webapp/app/routes/api.v1.runs.$runId.tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { logger } from "~/services/logger.server";
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";

// Pull the existing tags out of a buffer entry's serialised payload so
Expand Down Expand Up @@ -90,6 +91,14 @@ export async function action({ request, params }: ActionFunctionArgs) {
},
data: { runTags: { push: newTags } },
});
// Publish a run-changed record with the NEW tag set so tag feeds reindex
// (no-op unless enabled).
publishChangeRecord({
runId: taskRun.id,
envId: env.id,
tags: existing.concat(newTags),
batchId: taskRun.batchId,
});
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
},
// Buffer-applied patch path. The mutateSnapshot Lua deduplicates
Expand Down
7 changes: 5 additions & 2 deletions apps/webapp/app/routes/realtime.v1.batches.$batchId.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
import { anyResource, createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
Expand Down Expand Up @@ -33,7 +33,10 @@ export const loader = createLoaderApiRoute(
},
},
async ({ authentication, request, resource: batchRun, apiVersion }) => {
return realtimeClient.streamBatch(
// Pick the Electric proxy or the native backend per org (defaults to Electric); both implement streamBatch.
const client = await resolveRealtimeStreamClient(authentication.environment);

return client.streamBatch(
request.url,
authentication.environment,
batchRun.id,
Expand Down
12 changes: 6 additions & 6 deletions apps/webapp/app/routes/realtime.v1.runs.$runId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
import {
anyResource,
createLoaderApiRoute,
Expand Down Expand Up @@ -48,17 +48,17 @@ export const loader = createLoaderApiRoute(
},
},
async ({ authentication, request, resource: run, apiVersion }) => {
return realtimeClient.streamRun(
// Pick the Electric proxy or the native backend per org (defaults to Electric); both implement streamRun.
const client = await resolveRealtimeStreamClient(authentication.environment);

return client.streamRun(
request.url,
authentication.environment,
run.id,
apiVersion,
authentication.realtime,
request.headers.get("x-trigger-electric-version") ?? undefined,
// Propagate abort on client disconnect so the upstream Electric long-poll
// fetch is cancelled too. Without this, undici buffers from the unconsumed
// upstream response body accumulate until Electric's poll timeout, causing
// steady RSS growth on api (see docs/runbooks for the H1 isolation test).
// Propagate abort on client disconnect so the upstream Electric long-poll is cancelled too, else undici buffers grow RSS until the poll timeout.
getRequestAbortSignal()
);
}
Expand Down
14 changes: 6 additions & 8 deletions apps/webapp/app/routes/realtime.v1.runs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { z } from "zod";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
import {
anyResource,
createLoaderApiRoute,
Expand All @@ -25,12 +25,7 @@ export const loader = createLoaderApiRoute(
authorization: {
action: "read",
resource: (_, __, searchParams) =>
// Pre-RBAC, the resource was the searchParams object itself and
// the legacy `checkAuthorization` iterated `Object.keys`, so a
// JWT with type-level `read:tags` (no id) granted access to the
// unfiltered runs stream. Including `{ type: "tags" }` here
// preserves that — per-id `read:tags:<tag>` still grants only
// when the filter includes that tag.
// `{ type: "tags" }` preserves pre-RBAC type-level `read:tags` access to the unfiltered stream; per-id `read:tags:<tag>` still grants only when the filter includes that tag.
anyResource([
{ type: "runs" },
{ type: "tags" },
Expand All @@ -39,7 +34,10 @@ export const loader = createLoaderApiRoute(
},
},
async ({ searchParams, authentication, request, apiVersion }) => {
return realtimeClient.streamRuns(
// Pick the Electric proxy or the native backend per org (defaults to Electric); both implement streamRuns.
const client = await resolveRealtimeStreamClient(authentication.environment);

return client.streamRuns(
request.url,
authentication.environment,
searchParams,
Expand Down
49 changes: 48 additions & 1 deletion apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,36 @@ function initializeRunEngineClickhouseClient(): ClickHouse {
});
}

/** Realtime runs feed tag/batch id resolution (`REALTIME_BACKEND_NATIVE_CLICKHOUSE_URL`);
* falls back to the default client if unset. */
const defaultRealtimeClickhouseClient = singleton(
"realtimeClickhouseClient",
initializeRealtimeClickhouseClient
);

function initializeRealtimeClickhouseClient(): ClickHouse {
if (!env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_URL) {
return defaultClickhouseClient;
}

const url = new URL(env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_URL);
url.searchParams.delete("secure");

return new ClickHouse({
url: url.toString(),
name: "realtime-runs-clickhouse",
keepAlive: {
enabled: env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
idleSocketTtl: env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
},
logLevel: env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_LOG_LEVEL,
compression: {
request: env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_COMPRESSION_REQUEST === "1",
},
maxOpenConnections: env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
});
}

/** Task events (`EVENTS_CLICKHOUSE_URL`); not exported — accessed via factory. */
const defaultEventsClickhouseClient = singleton(
"eventsClickhouseClient",
Expand Down Expand Up @@ -257,7 +287,8 @@ export type ClientType =
| "logs"
| "query"
| "admin"
| "engine";
| "engine"
| "realtime";

function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHouse {
const parsed = new URL(url);
Expand Down Expand Up @@ -330,6 +361,20 @@ function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHou
},
maxOpenConnections: env.RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
});
case "realtime":
return new ClickHouse({
url: parsed.toString(),
name,
keepAlive: {
enabled: env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
idleSocketTtl: env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
},
logLevel: env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_LOG_LEVEL,
compression: {
request: env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_COMPRESSION_REQUEST === "1",
},
maxOpenConnections: env.REALTIME_BACKEND_NATIVE_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
});
case "standard":
case "query":
case "admin":
Expand Down Expand Up @@ -398,6 +443,8 @@ export class ClickhouseFactory {
return defaultAdminClickhouseClient;
case "engine":
return defaultRunEngineClickhouseClient;
case "realtime":
return defaultRealtimeClickhouseClient;
}
}

Expand Down
Loading
Loading