Skip to content

Commit c037101

Browse files
committed
Multiple proxies
Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com>
1 parent c130381 commit c037101

File tree

10 files changed

+310
-86
lines changed

10 files changed

+310
-86
lines changed
Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
import type { QueueDriver } from './queue-drivers/types.js';
22

3-
export interface QueueConfig {
3+
export type PostgresWorldConfig = {
4+
securityToken: string;
45
connectionString: string;
5-
jobPrefix?: string;
6-
queueConcurrency?: number;
7-
}
8-
9-
export type PostgresWorldConfig = QueueConfig & {
10-
queueFactory: (config: QueueConfig) => QueueDriver;
6+
queueFactory: () => QueueDriver;
117
};

packages/world-postgres/src/index.ts

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import createPostgres from 'postgres';
33
import type { PostgresWorldConfig } from './config.js';
44
import { createClient, type Drizzle } from './drizzle/index.js';
55
import { createQueue } from './queue.js';
6-
import { createPgBossQueue } from './queue-drivers/pg-boss.js';
6+
import { createPgBossHttpProxy } from './queue-drivers/pgboss-http-proxy.js';
77
import {
88
createEventsStorage,
99
createHooksStorage,
@@ -12,33 +12,21 @@ import {
1212
} from './storage.js';
1313
import { createStreamer } from './streamer.js';
1414

15-
function createStorage(drizzle: Drizzle): Storage {
16-
return {
17-
runs: createRunsStorage(drizzle),
18-
events: createEventsStorage(drizzle),
19-
hooks: createHooksStorage(drizzle),
20-
steps: createStepsStorage(drizzle),
21-
};
22-
}
15+
export const DEFAULT_PG_URL = 'postgres://world:world@localhost:5432/world';
2316

2417
export function createWorld(
2518
config: PostgresWorldConfig = {
26-
connectionString:
27-
process.env.WORKFLOW_POSTGRES_URL ||
28-
'postgres://world:world@localhost:5432/world',
29-
jobPrefix: process.env.WORKFLOW_POSTGRES_JOB_PREFIX,
30-
queueConcurrency:
31-
parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) ||
32-
10,
33-
queueFactory: (config) => createPgBossQueue(config),
19+
connectionString: process.env.WORKFLOW_POSTGRES_URL || DEFAULT_PG_URL,
20+
securityToken: process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'secret',
21+
queueFactory: defaultQueueFactory,
3422
}
3523
): World & { start(): Promise<void> } {
36-
const queueDriver = config.queueFactory(config);
24+
const queueDriver = config.queueFactory();
3725
const postgres = createPostgres(config.connectionString);
3826
const drizzle = createClient(postgres);
3927

40-
const queue = createQueue(queueDriver);
4128
const storage = createStorage(drizzle);
29+
const queue = createQueue(queueDriver, config.securityToken);
4230
const streamer = createStreamer(postgres, drizzle);
4331

4432
return {
@@ -51,6 +39,32 @@ export function createWorld(
5139
};
5240
}
5341

54-
// Re-export schema for users who want to extend or inspect the database schema
42+
function createStorage(drizzle: Drizzle): Storage {
43+
return {
44+
runs: createRunsStorage(drizzle),
45+
events: createEventsStorage(drizzle),
46+
hooks: createHooksStorage(drizzle),
47+
steps: createStepsStorage(drizzle),
48+
};
49+
}
50+
51+
function defaultQueueFactory() {
52+
return createPgBossHttpProxy({
53+
securityToken:
54+
process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'this-is-not-safe',
55+
baseUrl: process.env.WORKFLOW_POSTGRES_BASE_URL,
56+
connectionString: process.env.WORKFLOW_POSTGRES_URL || DEFAULT_PG_URL,
57+
queueConcurrency:
58+
parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) ||
59+
10,
60+
});
61+
}
62+
5563
export type { PostgresWorldConfig } from './config.js';
64+
// Re-export schema for users who want to extend or inspect the database schema
5665
export * from './drizzle/schema.js';
66+
export type { PgBossFunctionProxyConfig } from './queue-drivers/pgboss-function-proxy.js';
67+
export { createPgBossFunctionProxy } from './queue-drivers/pgboss-function-proxy.js';
68+
export type { PgBossHttpProxyConfig } from './queue-drivers/pgboss-http-proxy.js';
69+
// Re-export queue drivers for custom configurations
70+
export { createPgBossHttpProxy } from './queue-drivers/pgboss-http-proxy.js';

packages/world-postgres/src/queue-drivers/pg-boss.ts renamed to packages/world-postgres/src/queue-drivers/pgboss-base.ts

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,24 @@
11
import PgBoss from 'pg-boss';
2-
import type { QueueConfig } from '../config.js';
3-
import { MessageData } from './types.js';
4-
import { proxyStep, proxyWorkflow } from './wkf-proxy.js';
2+
import { MessageData, type QueueDriver } from './types.js';
53

6-
export function createPgBossQueue(config: QueueConfig) {
4+
export interface ProxyFunctions {
5+
proxyWorkflow: (message: MessageData) => Promise<Response>;
6+
proxyStep: (message: MessageData) => Promise<Response>;
7+
}
8+
9+
/**
10+
* Base QueueDriver implementation using pg-boss for job management.
11+
* Accepts a proxy implementation that handles the actual workflow/step execution.
12+
* This eliminates code duplication between HTTP and function-based proxies.
13+
*/
14+
export function createPgBossQueue(
15+
config: {
16+
connectionString: string;
17+
jobPrefix?: string;
18+
queueConcurrency?: number;
19+
},
20+
proxy: ProxyFunctions
21+
): QueueDriver {
722
let startPromise: Promise<unknown> | null = null;
823
const boss = new PgBoss(config.connectionString);
924

@@ -51,7 +66,22 @@ export function createPgBossQueue(config: QueueConfig) {
5166
console.log(`[${job.id}] running: ${message.queueName}`);
5267

5368
try {
54-
await proxyWorkflow(message);
69+
const response = await proxy.proxyWorkflow(message);
70+
71+
// TODO: Properly handle sleep
72+
if (response.status === 503) {
73+
const body = (await response.json()) as {
74+
timeoutSeconds?: number;
75+
};
76+
if (body.timeoutSeconds) {
77+
throw new Error(`Retry after ${body.timeoutSeconds}s`);
78+
}
79+
}
80+
81+
if (!response.ok) {
82+
const text = await response.text();
83+
throw new Error(`Workflow failed: ${text}`);
84+
}
5585
} catch (error) {
5686
console.error(
5787
`[${job.id}] Error handling workflow: ${message.queueName}`,
@@ -67,7 +97,21 @@ export function createPgBossQueue(config: QueueConfig) {
6797
console.log(`[${job.id}] running: ${message.queueName}`);
6898

6999
try {
70-
await proxyStep(message);
100+
const response = await proxy.proxyStep(message);
101+
102+
if (response.status === 503) {
103+
const body = (await response.json()) as {
104+
timeoutSeconds?: number;
105+
};
106+
if (body.timeoutSeconds) {
107+
throw new Error(`Retry after ${body.timeoutSeconds}s`);
108+
}
109+
}
110+
111+
if (!response.ok) {
112+
const text = await response.text();
113+
throw new Error(`Step failed: ${text}`);
114+
}
71115
} catch (error) {
72116
console.error(
73117
`[${job.id}] Error handling step: ${message.queueName}`,
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { createPgBossQueue } from './pgboss-base.js';
2+
import type { QueueDriver } from './types.js';
3+
import { createFunctionProxy } from './wkf-function-proxy.js';
4+
5+
export interface PgBossFunctionProxyConfig {
6+
connectionString: string;
7+
securityToken: string;
8+
jobPrefix?: string;
9+
queueConcurrency?: number;
10+
workflowEntrypoint: (request: Request) => Promise<Response>;
11+
stepEntrypoint: (request: Request) => Promise<Response>;
12+
}
13+
14+
/**
15+
* QueueDriver implementation using pg-boss for job management and direct function calls for execution.
16+
* Workers call entrypoint functions directly in-process without HTTP overhead.
17+
*/
18+
export function createPgBossFunctionProxy(
19+
config: PgBossFunctionProxyConfig
20+
): QueueDriver {
21+
const proxy = createFunctionProxy({
22+
securityToken: config.securityToken,
23+
workflowEntrypoint: config.workflowEntrypoint,
24+
stepEntrypoint: config.stepEntrypoint,
25+
});
26+
27+
return createPgBossQueue(
28+
{
29+
connectionString: config.connectionString,
30+
jobPrefix: config.jobPrefix,
31+
queueConcurrency: config.queueConcurrency,
32+
},
33+
proxy
34+
);
35+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { createPgBossQueue } from './pgboss-base.js';
2+
import type { QueueDriver } from './types.js';
3+
import { createHttpProxy } from './wkf-http-proxy.js';
4+
5+
export interface PgBossHttpProxyConfig {
6+
connectionString: string;
7+
securityToken: string;
8+
jobPrefix?: string;
9+
queueConcurrency?: number;
10+
baseUrl?: string;
11+
port?: number;
12+
}
13+
14+
/**
15+
* QueueDriver implementation using pg-boss for job management and HTTP for execution.
16+
* Workers make HTTP calls to the Next.js app's .well-known/workflow/v1/* endpoints.
17+
*/
18+
export function createPgBossHttpProxy(
19+
config: PgBossHttpProxyConfig
20+
): QueueDriver {
21+
const proxy = createHttpProxy({
22+
securityToken: config.securityToken,
23+
baseUrl: config.baseUrl,
24+
port: config.port,
25+
});
26+
27+
return createPgBossQueue(
28+
{
29+
connectionString: config.connectionString,
30+
jobPrefix: config.jobPrefix,
31+
queueConcurrency: config.queueConcurrency,
32+
},
33+
proxy
34+
);
35+
}

packages/world-postgres/src/queue-drivers/types.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ import * as z from 'zod';
33
import { Base64Buffer } from '../zod.js';
44

55
/**
6-
/* Most queues are using JSON under the hood, so we need to base64
7-
/* encode the body to ensure binary safety maybe later we can
8-
/* have a `blobs` table for larger payloads
9-
**/
6+
* Most queues are using JSON under the hood, so we need to base64
7+
* encode the body to ensure binary safety maybe later we can
8+
* have a `blobs` table for larger payloads
9+
*/
1010
export const MessageData = z.object({
1111
id: z
1212
.string()
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { MessageData } from './types.js';
2+
3+
export interface FunctionProxyConfig {
4+
securityToken: string;
5+
stepEntrypoint: (request: Request) => Promise<Response>;
6+
workflowEntrypoint: (request: Request) => Promise<Response>;
7+
}
8+
9+
export interface FunctionProxyFunctions {
10+
proxyStep: (message: MessageData) => Promise<Response>;
11+
proxyWorkflow: (message: MessageData) => Promise<Response>;
12+
}
13+
14+
/**
15+
* Creates function-based proxy functions that call workflow/step entrypoints directly.
16+
* Workers call entrypoint functions in-process without HTTP overhead.
17+
*/
18+
export function createFunctionProxy(
19+
config: FunctionProxyConfig
20+
): FunctionProxyFunctions {
21+
const createHeaders = () => {
22+
const headers: Record<string, string> = {
23+
'Content-Type': 'application/json',
24+
'X-Workflow-Secret': config.securityToken,
25+
};
26+
27+
return headers;
28+
};
29+
30+
return {
31+
proxyWorkflow: async (message: MessageData): Promise<Response> => {
32+
const request = new Request(
33+
'https://world-postgres.local/wkf-direct-call',
34+
{
35+
method: 'POST',
36+
headers: createHeaders(),
37+
body: JSON.stringify(MessageData.encode(message)),
38+
}
39+
);
40+
41+
return config.workflowEntrypoint(request);
42+
},
43+
44+
proxyStep: async (message: MessageData): Promise<Response> => {
45+
const request = new Request(
46+
'https://world-postgres.local/wkf-direct-call',
47+
{
48+
method: 'POST',
49+
headers: createHeaders(),
50+
body: JSON.stringify(MessageData.encode(message)),
51+
}
52+
);
53+
54+
return config.stepEntrypoint(request);
55+
},
56+
};
57+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { MessageData } from './types.js';
2+
3+
export interface HttpProxyConfig {
4+
port?: number;
5+
baseUrl?: string;
6+
securityToken: string;
7+
}
8+
9+
export interface HttpProxyFunctions {
10+
proxyWorkflow: (message: MessageData) => Promise<Response>;
11+
proxyStep: (message: MessageData) => Promise<Response>;
12+
}
13+
14+
/**
15+
* Creates HTTP-based proxy functions that call the Next.js app's workflow endpoints.
16+
* Workers communicate with the app via HTTP fetch to .well-known/workflow/v1/* endpoints.
17+
*/
18+
export function createHttpProxy(config: HttpProxyConfig): HttpProxyFunctions {
19+
const resolveBaseUrl = (): string => {
20+
if (config.baseUrl) return config.baseUrl;
21+
if (config.port) return `http://localhost:${config.port}`;
22+
return 'http://localhost:3000';
23+
};
24+
25+
const createHeaders = () => {
26+
const headers: Record<string, string> = {
27+
'Content-Type': 'application/json',
28+
'X-Workflow-Secret': config.securityToken,
29+
};
30+
31+
return headers;
32+
};
33+
34+
const baseUrl = resolveBaseUrl();
35+
36+
return {
37+
proxyWorkflow: async (message: MessageData): Promise<Response> => {
38+
return fetch(`${baseUrl}/.well-known/workflow/v1/flow`, {
39+
method: 'POST',
40+
headers: createHeaders(),
41+
body: JSON.stringify(MessageData.encode(message)),
42+
});
43+
},
44+
45+
proxyStep: async (message: MessageData): Promise<Response> => {
46+
return fetch(`${baseUrl}/.well-known/workflow/v1/step`, {
47+
method: 'POST',
48+
headers: createHeaders(),
49+
body: JSON.stringify(MessageData.encode(message)),
50+
});
51+
},
52+
};
53+
}

0 commit comments

Comments
 (0)