Skip to content

Commit ac8a858

Browse files
authored
feat(edge-worker): add workerConfig to handler execution context (#203)
Adds workerConfig to handler execution context, allowing handlers to access worker configuration for intelligent decision-making based on retry limits, concurrency settings, and other parameters. Key Changes Added workerConfig to MessageExecution and StepTaskExecution contexts Deep frozen config prevents accidental handler modifications while keeping worker config mutable Performance optimized - config frozen once at startup and cached for reuse Reorganized configuration handling with cleaner factory methods and consolidated defaults Simplified EdgeWorker API by removing unnecessary union types Environment variables always sourced from platform (users cannot override)
1 parent 7109987 commit ac8a858

File tree

15 files changed

+864
-255
lines changed

15 files changed

+864
-255
lines changed

.changeset/bright-clocks-bet.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
'@pgflow/edge-worker': patch
3+
---
4+
5+
Add workerConfig to handler execution context
6+
7+
Handlers can now access worker configuration through `context.workerConfig` to make intelligent decisions based on retry limits, concurrency settings, and other worker parameters. The config is deeply frozen to prevent accidental modifications while remaining mutable by the worker itself for future features.
8+
9+
Key improvements:
10+
- Added `workerConfig` to MessageExecution and StepTaskExecution contexts
11+
- Config is cached and frozen once at startup for optimal performance
12+
- Reorganized configuration handling with cleaner factory methods
13+
- Simplified EdgeWorker API by removing unnecessary union types
14+
- Environment variables always sourced from platform (users cannot override)

pkgs/edge-worker/src/EdgeWorker.ts

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,6 @@ import type { MessageHandlerFn } from './queue/types.js';
1313
import type { AnyFlow } from '@pgflow/dsl';
1414
import type { CompatibleFlow } from './types/flowCompatibility.js';
1515

16-
/**
17-
* Configuration options for the EdgeWorker.
18-
*/
19-
export type EdgeWorkerConfig =
20-
| Omit<QueueWorkerConfig, 'sql'>
21-
| Omit<FlowWorkerConfig, 'sql'>;
2216

2317
/**
2418
* EdgeWorker is the main entry point for creating and starting edge workers.
@@ -88,17 +82,17 @@ export class EdgeWorker {
8882
TFlow extends AnyFlow = AnyFlow
8983
>(
9084
handlerOrFlow: MessageHandlerFn<TPayload> | TFlow,
91-
config: EdgeWorkerConfig = {}
92-
): Promise<void> {
85+
config?: Omit<QueueWorkerConfig, 'sql'> | Omit<FlowWorkerConfig, 'sql'>
86+
): Promise<PlatformAdapter> {
9387
if (typeof handlerOrFlow === 'function') {
94-
await this.startQueueWorker(
88+
return await this.startQueueWorker(
9589
handlerOrFlow as MessageHandlerFn<TPayload>,
96-
config as QueueWorkerConfig
90+
config
9791
);
9892
} else {
99-
await this.startFlowWorker(
93+
return await this.startFlowWorker(
10094
handlerOrFlow as TFlow,
101-
config as FlowWorkerConfig
95+
config
10296
);
10397
}
10498
}
@@ -142,21 +136,15 @@ export class EdgeWorker {
142136
static async startQueueWorker<TPayload extends Json = Json>(
143137
handler: MessageHandlerFn<TPayload>,
144138
config: QueueWorkerConfig = {}
145-
) {
139+
): Promise<PlatformAdapter> {
146140
this.ensureFirstCall();
147141

148142
// First, create the adapter
149143
this.platform = await createAdapter();
150144

151-
// Apply default values to the config
145+
// Add platform-specific values to the config
152146
const workerConfig: QueueWorkerConfig = {
153147
...config,
154-
queueName: config.queueName || 'tasks',
155-
maxConcurrent: config.maxConcurrent ?? 10,
156-
maxPgConnections: config.maxPgConnections ?? 4,
157-
maxPollSeconds: config.maxPollSeconds ?? 5,
158-
pollIntervalMs: config.pollIntervalMs ?? 200,
159-
visibilityTimeout: config.visibilityTimeout ?? 10,
160148
connectionString:
161149
config.connectionString || this.platform.connectionString,
162150
env: this.platform.env,
@@ -165,6 +153,8 @@ export class EdgeWorker {
165153
await this.platform.startWorker((createLoggerFn) => {
166154
return createQueueWorker(handler, workerConfig, createLoggerFn, this.platform);
167155
});
156+
157+
return this.platform;
168158
}
169159

170160
/**
@@ -196,27 +186,24 @@ export class EdgeWorker {
196186
static async startFlowWorker<TFlow extends AnyFlow>(
197187
flow: CompatibleFlow<TFlow>,
198188
config: FlowWorkerConfig = {}
199-
) {
189+
): Promise<PlatformAdapter> {
200190
this.ensureFirstCall();
201191

202192
// First, create the adapter
203193
this.platform = await createAdapter();
204194

205-
// Apply default values to the config
195+
// Add platform-specific values to the config
206196
const workerConfig: FlowWorkerConfig = {
207197
...config,
208-
maxConcurrent: config.maxConcurrent ?? 10,
209-
maxPgConnections: config.maxPgConnections ?? 4,
210-
batchSize: config.batchSize ?? 10,
211-
maxPollSeconds: config.maxPollSeconds ?? 2,
212-
pollIntervalMs: config.pollIntervalMs ?? 100,
213198
connectionString:
214199
config.connectionString || this.platform.connectionString,
215200
};
216201

217202
await this.platform.startWorker((createLoggerFn) => {
218203
return createFlowWorker(flow, workerConfig, createLoggerFn, this.platform);
219204
});
205+
206+
return this.platform;
220207
}
221208

222209
/**

pkgs/edge-worker/src/core/context.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type { BaseContext, AnyFlow, AllStepInputs } from '@pgflow/dsl';
33
import type { Json } from './types.js';
44
import type { PgmqMessageRecord } from '../queue/types.js';
55
import type { StepTaskRecord } from '../flow/types.js';
6+
import type { QueueWorkerConfig, FlowWorkerConfig } from './workerConfigTypes.js';
67

78
/* ──────────────────────────────────────────────────────────────────────
89
1. PLATFORM LAYER
@@ -22,11 +23,13 @@ export type PlatformContext<
2223

2324
export interface MessageExecution<TPayload extends Json = Json> {
2425
rawMessage: PgmqMessageRecord<TPayload>;
26+
workerConfig: Readonly<Omit<QueueWorkerConfig, 'sql'>>;
2527
}
2628

2729
export interface StepTaskExecution<TFlow extends AnyFlow = AnyFlow> {
2830
rawMessage: PgmqMessageRecord<AllStepInputs<TFlow>>;
2931
stepTask : StepTaskRecord<TFlow>;
32+
workerConfig: Readonly<Omit<FlowWorkerConfig, 'sql'>>;
3033
}
3134

3235
/** Message handler context for any platform */
@@ -51,6 +54,20 @@ export interface StepTaskWithMessage<TFlow extends AnyFlow> {
5154
task : StepTaskRecord<TFlow>;
5255
}
5356

57+
import { deepClone, deepFreeze } from './deepUtils.js';
58+
59+
/**
60+
* Creates a context-safe version of worker config by excluding sql connection,
61+
* deep cloning, and deep freezing the result to prevent modification by handlers
62+
*/
63+
export function createContextSafeConfig<T extends Record<string, unknown>>(
64+
config: T
65+
): Readonly<T extends { sql: unknown } ? Omit<T, 'sql'> : T> {
66+
const { sql: _sql, ...safeConfig } = config as T & { sql?: unknown };
67+
const clonedConfig = deepClone(safeConfig);
68+
return deepFreeze(clonedConfig) as Readonly<T extends { sql: unknown } ? Omit<T, 'sql'> : T>;
69+
}
70+
5471
/* ──────────────────────────────────────────────────────────────────────
5572
4. LEGACY COMPATIBILITY (for backward compatibility only)
5673
--------------------------------------------------------------------- */
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Deep clone an object recursively
3+
*/
4+
export function deepClone<T>(obj: T): T {
5+
if (obj === null || typeof obj !== 'object') {
6+
return obj;
7+
}
8+
9+
if (obj instanceof Date) {
10+
return new Date(obj.getTime()) as T;
11+
}
12+
13+
const source = obj as Record<PropertyKey, unknown>;
14+
const target = Array.isArray(obj) ? [] as unknown[] : {} as Record<PropertyKey, unknown>;
15+
16+
for (const key of Reflect.ownKeys(source)) {
17+
(target as Record<PropertyKey, unknown>)[key] = deepClone(source[key]);
18+
}
19+
20+
return target as T;
21+
}
22+
23+
/**
24+
* Deep freeze an object recursively to prevent any modifications
25+
*/
26+
export function deepFreeze<T>(obj: T): T {
27+
// Get property names
28+
const propNames = Reflect.ownKeys(obj as object);
29+
30+
// Freeze properties before freezing self
31+
for (const name of propNames) {
32+
const value = (obj as Record<PropertyKey, unknown>)[name];
33+
34+
if ((value && typeof value === 'object') || typeof value === 'function') {
35+
deepFreeze(value);
36+
}
37+
}
38+
39+
return Object.freeze(obj);
40+
}

pkgs/edge-worker/src/core/supabase-test-utils.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import type { PgmqMessageRecord } from '../queue/types.js';
66
import type {
77
StepTaskWithMessage
88
} from './context.js';
9+
import { createContextSafeConfig } from './context.js';
10+
import type { QueueWorkerConfig, FlowWorkerConfig } from './workerConfigTypes.js';
911
import type { SupabaseEnv } from '@pgflow/dsl/supabase';
1012
import { createServiceSupabaseClient } from './supabase-utils.js';
1113

@@ -18,8 +20,9 @@ export function createQueueWorkerContext<TPayload extends Json>(params: {
1820
sql: Sql;
1921
abortSignal: AbortSignal;
2022
rawMessage: PgmqMessageRecord<TPayload>;
23+
workerConfig?: Readonly<Omit<QueueWorkerConfig, 'sql'>>;
2124
}) {
22-
const { env, sql, abortSignal, rawMessage } = params;
25+
const { env, sql, abortSignal, rawMessage, workerConfig } = params;
2326

2427
// Create Supabase client if env vars exist, otherwise create mock
2528
const supabase = env.SUPABASE_URL && env.SUPABASE_SERVICE_ROLE_KEY
@@ -36,6 +39,7 @@ export function createQueueWorkerContext<TPayload extends Json>(params: {
3639

3740
// Message execution context
3841
rawMessage,
42+
...(workerConfig && { workerConfig: createContextSafeConfig(workerConfig) }),
3943

4044
// Supabase-specific resources (always present in Phase 1)
4145
sql,
@@ -53,8 +57,9 @@ export function createFlowWorkerContext<TFlow extends AnyFlow = AnyFlow>(params:
5357
sql: Sql;
5458
abortSignal: AbortSignal;
5559
taskWithMessage?: StepTaskWithMessage<TFlow>;
60+
workerConfig?: Readonly<Omit<FlowWorkerConfig, 'sql'>>;
5661
}) {
57-
const { env, sql, abortSignal, taskWithMessage } = params;
62+
const { env, sql, abortSignal, taskWithMessage, workerConfig } = params;
5863

5964
// Create Supabase client if env vars exist, otherwise create mock
6065
const supabase = env.SUPABASE_URL && env.SUPABASE_SERVICE_ROLE_KEY
@@ -76,6 +81,7 @@ export function createFlowWorkerContext<TFlow extends AnyFlow = AnyFlow>(params:
7681
// Step task execution context
7782
rawMessage: taskWithMessage.message,
7883
stepTask: taskWithMessage.task,
84+
...(workerConfig && { workerConfig: createContextSafeConfig(workerConfig) }),
7985

8086
// Supabase-specific resources (always present in Phase 1)
8187
sql,

0 commit comments

Comments
 (0)