From 30f8253803514dba2728da223f5d639a54301675 Mon Sep 17 00:00:00 2001 From: Wojciech Majewski Date: Sun, 7 Jun 2026 20:14:48 +0200 Subject: [PATCH] feat: add worker start mode --- .../__tests__/unit/PgflowClient.test.ts | 25 ++++ pkgs/client/src/lib/PgflowClient.ts | 7 ++ .../schemas/0056_table_worker_functions.sql | 5 + .../0057_function_track_worker_function.sql | 10 +- .../schemas/0059_function_ensure_workers.sql | 32 ++--- pkgs/core/src/database-types.ts | 5 +- ...0260607175525_pgflow_worker_start_mode.sql | 118 ++++++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../skips_process_start_mode.test.sql | 29 +++++ .../track_worker_function/start_mode.test.sql | 38 ++++++ pkgs/edge-worker/src/core/Queries.ts | 6 +- pkgs/edge-worker/src/core/WorkerLifecycle.ts | 3 +- pkgs/edge-worker/src/core/types.ts | 3 + .../src/flow/FlowWorkerLifecycle.ts | 3 +- .../src/platform/SupabasePlatformAdapter.ts | 1 + .../tests/types/platform-adapter.test-d.ts | 11 ++ .../FlowWorkerLifecycle.compilation.test.ts | 25 +++- pkgs/edge-worker/tests/unit/Queries.test.ts | 14 ++- .../unit/WorkerLifecycle.deprecation.test.ts | 25 +++- .../platform/SupabasePlatformAdapter.test.ts | 34 +++++ 20 files changed, 367 insertions(+), 30 deletions(-) create mode 100644 pkgs/core/supabase/migrations/20260607175525_pgflow_worker_start_mode.sql create mode 100644 pkgs/core/supabase/tests/ensure_workers/skips_process_start_mode.test.sql create mode 100644 pkgs/core/supabase/tests/track_worker_function/start_mode.test.sql diff --git a/pkgs/client/__tests__/unit/PgflowClient.test.ts b/pkgs/client/__tests__/unit/PgflowClient.test.ts index f9334dfd7..94875afd7 100644 --- a/pkgs/client/__tests__/unit/PgflowClient.test.ts +++ b/pkgs/client/__tests__/unit/PgflowClient.test.ts @@ -113,6 +113,31 @@ describe('PgflowClient', () => { expect(result).toBeNull(); }); + test('getRun returns null without logging when no run data is returned', async () => { + const { client, mocks } = createMockClient(); + const consoleErrorSpy = vi + .spyOn(console, 'error') + .mockImplementation(() => { + // Suppress the log while asserting it is not called. + }); + + mockRpcCall(mocks, { data: null, error: null }); + + const pgflowClient = new PgflowClient(client, { + realtimeStabilizationDelayMs: 0, + schedule: createSyncSchedule(), + }); + + try { + const result = await pgflowClient.getRun('nonexistent-id'); + + expect(result).toBeNull(); + expect(consoleErrorSpy).not.toHaveBeenCalled(); + } finally { + consoleErrorSpy.mockRestore(); + } + }); + test('emits events through callbacks', async () => { const { client, mocks } = createMockClient(); diff --git a/pkgs/client/src/lib/PgflowClient.ts b/pkgs/client/src/lib/PgflowClient.ts index f1ba550c1..e65229587 100644 --- a/pkgs/client/src/lib/PgflowClient.ts +++ b/pkgs/client/src/lib/PgflowClient.ts @@ -272,6 +272,13 @@ export class PgflowClient implements IFlowClien return flowRun; } catch (error) { + if ( + error instanceof Error && + error.message === `No data returned for run ${run_id}` + ) { + return null; + } + console.error('Error getting run:', error); // Re-throw if it's a validation error if (error instanceof Error && (error.message.includes('Invalid run data') || error.message.includes('Invalid step data'))) { diff --git a/pkgs/core/schemas/0056_table_worker_functions.sql b/pkgs/core/schemas/0056_table_worker_functions.sql index c2651d71b..92377b48e 100644 --- a/pkgs/core/schemas/0056_table_worker_functions.sql +++ b/pkgs/core/schemas/0056_table_worker_functions.sql @@ -3,6 +3,8 @@ create table if not exists pgflow.worker_functions ( function_name text not null primary key, + start_mode text not null default 'http', + constraint worker_functions_start_mode_check check (start_mode in ('http', 'process')), enabled boolean not null default true, debounce interval not null default '6 seconds' check (debounce >= '1 second'), @@ -17,6 +19,9 @@ comment on table pgflow.worker_functions is comment on column pgflow.worker_functions.function_name is 'Name of the Supabase Edge Function'; +comment on column pgflow.worker_functions.start_mode is +'How this worker function is started: http workers are pinged by ensure_workers(), process workers self-start'; + comment on column pgflow.worker_functions.enabled is 'Whether ensure_workers() should ping this function'; diff --git a/pkgs/core/schemas/0057_function_track_worker_function.sql b/pkgs/core/schemas/0057_function_track_worker_function.sql index ea8f7e732..e0f77e09d 100644 --- a/pkgs/core/schemas/0057_function_track_worker_function.sql +++ b/pkgs/core/schemas/0057_function_track_worker_function.sql @@ -2,16 +2,18 @@ -- Registers an edge function for monitoring by ensure_workers() cron create or replace function pgflow.track_worker_function( - function_name text + function_name text, + start_mode text default 'http' ) returns void language sql as $$ - insert into pgflow.worker_functions (function_name, updated_at) - values (track_worker_function.function_name, clock_timestamp()) + insert into pgflow.worker_functions (function_name, start_mode, updated_at) + values (track_worker_function.function_name, track_worker_function.start_mode, clock_timestamp()) on conflict (function_name) do update set + start_mode = excluded.start_mode, updated_at = clock_timestamp(); $$; -comment on function pgflow.track_worker_function(text) is +comment on function pgflow.track_worker_function(text, text) is 'Registers an edge function for monitoring. Called by workers on startup.'; diff --git a/pkgs/core/schemas/0059_function_ensure_workers.sql b/pkgs/core/schemas/0059_function_ensure_workers.sql index 3dc3d3987..ab5774d88 100644 --- a/pkgs/core/schemas/0059_function_ensure_workers.sql +++ b/pkgs/core/schemas/0059_function_ensure_workers.sql @@ -30,15 +30,20 @@ as $$ end as base_url ), + -- Only HTTP-started worker functions are supervised via HTTP pings. + http_worker_functions as ( + select wf.function_name, wf.debounce, wf.last_invoked_at + from pgflow.worker_functions as wf + where wf.enabled = true + and wf.start_mode = 'http' + ), + -- Find functions that pass the debounce check debounce_passed as ( select wf.function_name, wf.debounce - from pgflow.worker_functions as wf - where wf.enabled = true - and ( - wf.last_invoked_at is null - or wf.last_invoked_at < now() - wf.debounce - ) + from http_worker_functions as wf + where wf.last_invoked_at is null + or wf.last_invoked_at < now() - wf.debounce ), -- Find functions that have at least one alive worker @@ -56,15 +61,12 @@ as $$ -- Production mode: only functions that pass debounce AND have no alive workers functions_to_invoke as ( select wf.function_name - from pgflow.worker_functions as wf - where wf.enabled = true - and ( - pgflow.is_local() = true -- Local: all enabled functions - or ( - -- Production: debounce + no alive workers - wf.function_name in (select dp.function_name from debounce_passed as dp) - and wf.function_name not in (select faw.function_name from functions_with_alive_workers as faw) - ) + from http_worker_functions as wf + where pgflow.is_local() = true -- Local: all enabled HTTP functions + or ( + -- Production: debounce + no alive workers + wf.function_name in (select dp.function_name from debounce_passed as dp) + and wf.function_name not in (select faw.function_name from functions_with_alive_workers as faw) ) ), diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 7d7ed85b9..e91d12a4d 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -351,6 +351,7 @@ export type Database = { enabled: boolean function_name: string last_invoked_at: string | null + start_mode: string updated_at: string } Insert: { @@ -359,6 +360,7 @@ export type Database = { enabled?: boolean function_name: string last_invoked_at?: string | null + start_mode?: string updated_at?: string } Update: { @@ -367,6 +369,7 @@ export type Database = { enabled?: boolean function_name?: string last_invoked_at?: string | null + start_mode?: string updated_at?: string } Relationships: [] @@ -656,7 +659,7 @@ export type Database = { } } track_worker_function: { - Args: { function_name: string } + Args: { function_name: string; start_mode?: string } Returns: undefined } } diff --git a/pkgs/core/supabase/migrations/20260607175525_pgflow_worker_start_mode.sql b/pkgs/core/supabase/migrations/20260607175525_pgflow_worker_start_mode.sql new file mode 100644 index 000000000..db7c9204a --- /dev/null +++ b/pkgs/core/supabase/migrations/20260607175525_pgflow_worker_start_mode.sql @@ -0,0 +1,118 @@ +-- Modify "worker_functions" table +ALTER TABLE "pgflow"."worker_functions" ADD CONSTRAINT "worker_functions_start_mode_check" CHECK (start_mode = ANY (ARRAY['http'::text, 'process'::text])), ADD COLUMN "start_mode" text NOT NULL DEFAULT 'http'; +-- Set comment to column: "start_mode" on table: "worker_functions" +COMMENT ON COLUMN "pgflow"."worker_functions"."start_mode" IS 'How this worker function is started: http workers are pinged by ensure_workers(), process workers self-start'; +-- Modify "ensure_workers" function +CREATE OR REPLACE FUNCTION "pgflow"."ensure_workers" () RETURNS TABLE ("function_name" text, "invoked" boolean, "request_id" bigint) LANGUAGE sql AS $$ +with + -- Detect environment + env as ( + select pgflow.is_local() as is_local + ), + + -- Get credentials: Local mode uses hardcoded URL, production uses vault secrets + -- Empty strings are treated as NULL using nullif() + -- pgflow_auth_secret takes priority over supabase_service_role_key for production auth + credentials as ( + select + case + when (select is_local from env) then null + else coalesce( + nullif((select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_auth_secret'), ''), + nullif((select decrypted_secret from vault.decrypted_secrets where name = 'supabase_service_role_key'), '') + ) + end as service_role_key, + case + when (select is_local from env) then 'http://kong:8000/functions/v1' + else (select 'https://' || nullif(decrypted_secret, '') || '.supabase.co/functions/v1' from vault.decrypted_secrets where name = 'supabase_project_id') + end as base_url + ), + + -- Only HTTP-started worker functions are supervised via HTTP pings. + http_worker_functions as ( + select wf.function_name, wf.debounce, wf.last_invoked_at + from pgflow.worker_functions as wf + where wf.enabled = true + and wf.start_mode = 'http' + ), + + -- Find functions that pass the debounce check + debounce_passed as ( + select wf.function_name, wf.debounce + from http_worker_functions as wf + where wf.last_invoked_at is null + or wf.last_invoked_at < now() - wf.debounce + ), + + -- Find functions that have at least one alive worker + functions_with_alive_workers as ( + select distinct w.function_name + from pgflow.workers as w + inner join debounce_passed as dp on w.function_name = dp.function_name + where w.stopped_at is null + and w.deprecated_at is null + and w.last_heartbeat_at > now() - dp.debounce + ), + + -- Determine which functions should be invoked + -- Local mode: all enabled functions (bypass debounce AND alive workers check) + -- Production mode: only functions that pass debounce AND have no alive workers + functions_to_invoke as ( + select wf.function_name + from http_worker_functions as wf + where pgflow.is_local() = true -- Local: all enabled HTTP functions + or ( + -- Production: debounce + no alive workers + wf.function_name in (select dp.function_name from debounce_passed as dp) + and wf.function_name not in (select faw.function_name from functions_with_alive_workers as faw) + ) + ), + + -- Make HTTP requests and capture request_ids + http_requests as ( + select + fti.function_name, + net.http_post( + url => c.base_url || '/' || fti.function_name, + headers => case + when e.is_local then '{}'::jsonb + else jsonb_build_object( + 'Content-Type', 'application/json', + 'Authorization', 'Bearer ' || c.service_role_key + ) + end, + body => '{}'::jsonb + ) as request_id + from functions_to_invoke as fti + cross join credentials as c + cross join env as e + where c.base_url is not null + and (e.is_local or c.service_role_key is not null) + ), + + -- Update last_invoked_at for invoked functions + updated as ( + update pgflow.worker_functions as wf + set last_invoked_at = clock_timestamp() + from http_requests as hr + where wf.function_name = hr.function_name + returning wf.function_name + ) + + select u.function_name, true as invoked, hr.request_id + from updated as u + inner join http_requests as hr on u.function_name = hr.function_name +$$; +-- Drop "track_worker_function" function +DROP FUNCTION "pgflow"."track_worker_function" (text); +-- Create "track_worker_function" function +CREATE FUNCTION "pgflow"."track_worker_function" ("function_name" text, "start_mode" text DEFAULT 'http') RETURNS void LANGUAGE sql AS $$ +insert into pgflow.worker_functions (function_name, start_mode, updated_at) + values (track_worker_function.function_name, track_worker_function.start_mode, clock_timestamp()) + on conflict (function_name) + do update set + start_mode = excluded.start_mode, + updated_at = clock_timestamp(); +$$; +-- Set comment to function: "track_worker_function" +COMMENT ON FUNCTION "pgflow"."track_worker_function" (text, text) IS 'Registers an edge function for monitoring. Called by workers on startup.'; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 60c45edf9..314597418 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:jDc+2bvTL4ZYqATAMfBXbTYtMlx8RPvDUvRJjrP537w= +h1:ugGS1dSXEdS8KOgQWhOdxWCovACZT4bTwWa9HVb7F70= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -19,3 +19,4 @@ h1:jDc+2bvTL4ZYqATAMfBXbTYtMlx8RPvDUvRJjrP537w= 20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E= 20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc= 20260214181656_pgflow_step_conditions.sql h1:rHQnXCeZ/QGxPlChdTMxumtsTtYHr1ej183Dd+auw34= +20260607175525_pgflow_worker_start_mode.sql h1:PFAfoGaHe5stKF7YAFg6AqBxmRisqDvV60vVpnnVdBE= diff --git a/pkgs/core/supabase/tests/ensure_workers/skips_process_start_mode.test.sql b/pkgs/core/supabase/tests/ensure_workers/skips_process_start_mode.test.sql new file mode 100644 index 000000000..049ef9f16 --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_workers/skips_process_start_mode.test.sql @@ -0,0 +1,29 @@ +begin; +select plan(3); + +select pgflow_tests.reset_db(); + +select pgflow.track_worker_function('http-worker', 'http'); +select pgflow.track_worker_function('process-worker', 'process'); + +select is( + (select count(*)::int from pgflow.ensure_workers() where function_name = 'process-worker'), + 0, + 'ensure_workers skips process workers in production mode' +); + +select ok( + (select count(*)::int from pgflow.ensure_workers() where function_name = 'http-worker') >= 0, + 'ensure_workers still evaluates http workers' +); + +select set_config('app.settings.is_local', 'true', true); + +select is( + (select count(*)::int from pgflow.ensure_workers() where function_name = 'process-worker'), + 0, + 'ensure_workers skips process workers in local mode' +); + +select * from finish(); +rollback; diff --git a/pkgs/core/supabase/tests/track_worker_function/start_mode.test.sql b/pkgs/core/supabase/tests/track_worker_function/start_mode.test.sql new file mode 100644 index 000000000..a475d4f10 --- /dev/null +++ b/pkgs/core/supabase/tests/track_worker_function/start_mode.test.sql @@ -0,0 +1,38 @@ +begin; +select plan(4); + +select pgflow_tests.reset_db(); + +select pgflow.track_worker_function('default-http-worker'); + +select is( + (select start_mode from pgflow.worker_functions where function_name = 'default-http-worker'), + 'http', + 'track_worker_function defaults start_mode to http' +); + +select pgflow.track_worker_function('process-worker', 'process'); + +select is( + (select start_mode from pgflow.worker_functions where function_name = 'process-worker'), + 'process', + 'track_worker_function stores explicit process start_mode' +); + +select pgflow.track_worker_function('mode-update-worker', 'http'); +select pgflow.track_worker_function('mode-update-worker', 'process'); + +select is( + (select start_mode from pgflow.worker_functions where function_name = 'mode-update-worker'), + 'process', + 'track_worker_function updates start_mode on conflict' +); + +select throws_ok( + $$ select pgflow.track_worker_function('invalid-worker', 'invalid') $$, + 'new row for relation "worker_functions" violates check constraint "worker_functions_start_mode_check"', + 'track_worker_function rejects unsupported start_mode values' +); + +select * from finish(); +rollback; diff --git a/pkgs/edge-worker/src/core/Queries.ts b/pkgs/edge-worker/src/core/Queries.ts index 55b9fcdf7..58e0dc241 100644 --- a/pkgs/edge-worker/src/core/Queries.ts +++ b/pkgs/edge-worker/src/core/Queries.ts @@ -1,5 +1,5 @@ import type postgres from 'postgres'; -import type { WorkerRow } from './types.js'; +import type { WorkerRow, WorkerStartMode } from './types.js'; import type { FlowShape, Json } from '@pgflow/dsl'; export type EnsureFlowCompiledStatus = 'compiled' | 'verified' | 'recompiled' | 'mismatch'; @@ -81,9 +81,9 @@ export class Queries { * Called by workers on startup. Sets last_invoked_at to prevent cron from * pinging during startup (debounce). */ - async trackWorkerFunction(functionName: string): Promise { + async trackWorkerFunction(functionName: string, startMode: WorkerStartMode = 'http'): Promise { await this.sql` - SELECT pgflow.track_worker_function(${functionName}) + SELECT pgflow.track_worker_function(${functionName}, ${startMode}) `; } diff --git a/pkgs/edge-worker/src/core/WorkerLifecycle.ts b/pkgs/edge-worker/src/core/WorkerLifecycle.ts index e6ba85b15..0eaa3cc92 100644 --- a/pkgs/edge-worker/src/core/WorkerLifecycle.ts +++ b/pkgs/edge-worker/src/core/WorkerLifecycle.ts @@ -30,7 +30,8 @@ export class WorkerLifecycle implements ILifecycle { this.workerState.transitionTo(States.Starting); // Register this edge function for monitoring by ensure_workers() cron. - await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName); + const startMode = workerBootstrap.startMode ?? 'http'; + await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName, startMode); this.logger.debug(`Ensuring queue '${this.queue.queueName}' exists...`); await this.queue.safeCreate(); diff --git a/pkgs/edge-worker/src/core/types.ts b/pkgs/edge-worker/src/core/types.ts index b3f5bc331..dcd825fdf 100644 --- a/pkgs/edge-worker/src/core/types.ts +++ b/pkgs/edge-worker/src/core/types.ts @@ -48,7 +48,10 @@ export type WorkerRow = { function_name: string; }; +export type WorkerStartMode = 'http' | 'process'; + export interface WorkerBootstrap { edgeFunctionName: string; workerId: string; + startMode?: WorkerStartMode; } diff --git a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts index 99b5362c0..664578aad 100644 --- a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts +++ b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts @@ -52,7 +52,8 @@ export class FlowWorkerLifecycle implements ILifecycle { this._edgeFunctionName = workerBootstrap.edgeFunctionName; // Register this edge function for monitoring by ensure_workers() cron. - await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName); + const startMode = workerBootstrap.startMode ?? 'http'; + await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName, startMode); // Compile/verify flow as part of Starting (before registering worker) let compilationStatus: CompilationStatus = 'verified'; diff --git a/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts b/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts index 74c76b3d3..22386419d 100644 --- a/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts +++ b/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts @@ -234,6 +234,7 @@ export class SupabasePlatformAdapter implements PlatformAdapter { + override trackWorkerFunction(functionName: string, startMode = 'http'): Promise { this.trackWorkerFunctionCallCount++; this.lastTrackedFunctionName = functionName; + this.lastTrackedStartMode = startMode; return Promise.resolve(); } } @@ -259,4 +261,25 @@ Deno.test('FlowWorkerLifecycle - calls trackWorkerFunction during startup', asyn assertEquals(mockQueries.trackWorkerFunctionCallCount, 1, 'trackWorkerFunction should be called once'); assertEquals(mockQueries.lastTrackedFunctionName, 'my-edge-function', 'trackWorkerFunction should be called with correct function name'); + assertEquals(mockQueries.lastTrackedStartMode, 'http', 'trackWorkerFunction should default start mode to http'); +}); + +Deno.test('FlowWorkerLifecycle - passes process start mode during startup', async () => { + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + const logger = createLogger(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger); + + const workerBootstrap = { + workerId: 'test-worker-id', + edgeFunctionName: 'my-edge-function', + startMode: 'process' as const, + }; + + await lifecycle.acknowledgeStart(workerBootstrap); + + assertEquals(mockQueries.trackWorkerFunctionCallCount, 1, 'trackWorkerFunction should be called once'); + assertEquals(mockQueries.lastTrackedFunctionName, 'my-edge-function', 'trackWorkerFunction should be called with correct function name'); + assertEquals(mockQueries.lastTrackedStartMode, 'process', 'trackWorkerFunction should receive process start mode'); }); diff --git a/pkgs/edge-worker/tests/unit/Queries.test.ts b/pkgs/edge-worker/tests/unit/Queries.test.ts index 5c3becab3..c879d4272 100644 --- a/pkgs/edge-worker/tests/unit/Queries.test.ts +++ b/pkgs/edge-worker/tests/unit/Queries.test.ts @@ -25,11 +25,21 @@ Deno.test('Queries.trackWorkerFunction - calls correct SQL function', async () = await queries.trackWorkerFunction('my-edge-function'); assertEquals(calls.length, 1); - assertEquals(calls[0].values, ['my-edge-function']); + assertEquals(calls[0].values, ['my-edge-function', 'http']); // Check that query references the correct function assertEquals(calls[0].query.includes('pgflow.track_worker_function'), true); }); +Deno.test('Queries.trackWorkerFunction - passes explicit process start mode', async () => { + const { mockSql, calls } = createMockSql(); + const queries = new Queries(mockSql); + + await queries.trackWorkerFunction('process-worker', 'process'); + + assertEquals(calls.length, 1); + assertEquals(calls[0].values, ['process-worker', 'process']); +}); + Deno.test('Queries.trackWorkerFunction - handles special characters in function name', async () => { const { mockSql, calls } = createMockSql(); const queries = new Queries(mockSql); @@ -37,7 +47,7 @@ Deno.test('Queries.trackWorkerFunction - handles special characters in function await queries.trackWorkerFunction('my_function-with-special_chars'); assertEquals(calls.length, 1); - assertEquals(calls[0].values, ['my_function-with-special_chars']); + assertEquals(calls[0].values, ['my_function-with-special_chars', 'http']); }); Deno.test('Queries.markWorkerStopped - calls correct SQL function', async () => { diff --git a/pkgs/edge-worker/tests/unit/WorkerLifecycle.deprecation.test.ts b/pkgs/edge-worker/tests/unit/WorkerLifecycle.deprecation.test.ts index a4289d2ba..f428b3724 100644 --- a/pkgs/edge-worker/tests/unit/WorkerLifecycle.deprecation.test.ts +++ b/pkgs/edge-worker/tests/unit/WorkerLifecycle.deprecation.test.ts @@ -18,6 +18,7 @@ class MockQueries extends Queries { public sendHeartbeatCallCount = 0; public nextResult: { is_deprecated: boolean } = { is_deprecated: false }; public workerStopped = false; + public trackedWorkerFunctions: Array<[string, string]> = []; constructor() { // Pass null as sql since we'll override all methods @@ -51,7 +52,8 @@ class MockQueries extends Queries { return Promise.resolve(workerRow); } - override trackWorkerFunction(_functionName: string): Promise { + override trackWorkerFunction(functionName: string, startMode = 'http'): Promise { + this.trackedWorkerFunctions.push([functionName, startMode]); return Promise.resolve(); } @@ -60,6 +62,27 @@ class MockQueries extends Queries { } } +Deno.test('WorkerLifecycle - passes process start mode to worker tracking', async () => { + const mockQueries = new MockQueries(); + const mockQueue = new MockQueue('test-queue'); + const lifecycle = new WorkerLifecycle( + mockQueries, + mockQueue, + logger, + { heartbeatInterval: 0 } + ); + + const workerBootstrap = { + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + startMode: 'process' as const, + }; + + await lifecycle.acknowledgeStart(workerBootstrap); + + assertEquals(mockQueries.trackedWorkerFunctions, [['test-function', 'process']]); +}); + // Mock Queue class MockQueue extends Queue { constructor(queueName: string) { diff --git a/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts b/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts index 0c0506cab..c77cbc404 100644 --- a/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts +++ b/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts @@ -323,6 +323,40 @@ Deno.test({ }, }); +Deno.test({ + name: 'HTTP startup passes http start mode to worker bootstrap', + sanitizeResources: false, + fn: async () => { + let serveHandler: ((req: Request) => Response | Promise) | null = null; + let bootstrap: unknown = null; + + const deps = createMockDeps({ + serve: (h) => { + serveHandler = h; + }, + }); + + const adapter = new SupabasePlatformAdapter(undefined, deps); + await adapter.startWorker(() => ({ + startOnlyOnce: (workerBootstrap: unknown) => { + bootstrap = workerBootstrap; + }, + stop: () => Promise.resolve(), + } as unknown as Worker)); + + const handler = serveHandler as unknown as (req: Request) => Response | Promise; + await handler(new Request('http://localhost/functions/v1/my-worker', { + headers: { authorization: 'Bearer test-service-key' }, + })); + + assertEquals(bootstrap, { + edgeFunctionName: 'functions/v1/my-worker', + workerId: 'test-exec-id', + startMode: 'http', + }); + }, +}); + Deno.test({ name: 'stopWorker aborts the shutdown signal', sanitizeResources: false,