Skip to content

Commit f819d7c

Browse files
committed
Queue Drivers + Direct wf/steps proxy
Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com>
1 parent 21701ea commit f819d7c

File tree

6 files changed

+229
-93
lines changed

6 files changed

+229
-93
lines changed
Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1-
export interface PostgresWorldConfig {
1+
import type { QueueDriver } from './queue-drivers/types.js';
2+
3+
export interface QueueConfig {
24
connectionString: string;
35
jobPrefix?: string;
46
queueConcurrency?: number;
57
}
8+
9+
export type PostgresWorldConfig = QueueConfig & {
10+
queueFactory: (config: QueueConfig) => QueueDriver;
11+
};

packages/world-postgres/src/index.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import type { Storage, World } from '@workflow/world';
2-
import PgBoss from 'pg-boss';
32
import createPostgres from 'postgres';
43
import type { PostgresWorldConfig } from './config.js';
54
import { createClient, type Drizzle } from './drizzle/index.js';
65
import { createQueue } from './queue.js';
6+
import { createPgBossQueue } from './queue-drivers/pg-boss.js';
77
import {
88
createEventsStorage,
99
createHooksStorage,
@@ -30,14 +30,14 @@ export function createWorld(
3030
queueConcurrency:
3131
parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) ||
3232
10,
33+
queueFactory: (config) => createPgBossQueue(config),
3334
}
3435
): World & { start(): Promise<void> } {
35-
const boss = new PgBoss({
36-
connectionString: config.connectionString,
37-
});
36+
const queueDriver = config.queueFactory(config);
3837
const postgres = createPostgres(config.connectionString);
3938
const drizzle = createClient(postgres);
40-
const queue = createQueue(boss, config);
39+
40+
const queue = createQueue(queueDriver);
4141
const storage = createStorage(drizzle);
4242
const streamer = createStreamer(postgres, drizzle);
4343

@@ -46,7 +46,7 @@ export function createWorld(
4646
...streamer,
4747
...queue,
4848
async start() {
49-
await queue.start();
49+
await queueDriver.start();
5050
},
5151
};
5252
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import PgBoss from 'pg-boss';
2+
import type { QueueConfig } from '../config.js';
3+
import { MessageData } from './types.js';
4+
import { proxyStep, proxyWorkflow } from './wkf-proxy.js';
5+
6+
export function createPgBossQueue(config: QueueConfig) {
7+
let startPromise: Promise<unknown> | null = null;
8+
const boss = new PgBoss(config.connectionString);
9+
10+
const stepQueueName = 'workflow_steps';
11+
const workflowQueueName = 'workflow_flows';
12+
13+
const ensureStarted = async () => {
14+
if (!startPromise) {
15+
startPromise = boss.start().then(() => {
16+
return Promise.all([
17+
boss.createQueue(workflowQueueName),
18+
boss.createQueue(stepQueueName),
19+
]);
20+
});
21+
}
22+
23+
await startPromise;
24+
};
25+
26+
return {
27+
pushStep: async (message: MessageData) => {
28+
await ensureStarted();
29+
30+
await boss.send(stepQueueName, MessageData.encode(message), {
31+
singletonKey: message?.idempotencyKey ?? message.messageId,
32+
retryLimit: 3,
33+
});
34+
},
35+
36+
pushFlow: async (message: MessageData) => {
37+
await ensureStarted();
38+
39+
await boss.send(workflowQueueName, MessageData.encode(message), {
40+
singletonKey: message?.idempotencyKey ?? message.messageId,
41+
retryLimit: 3,
42+
});
43+
},
44+
45+
start: async () => {
46+
await ensureStarted();
47+
48+
const workflowHandler = async ([job]: PgBoss.Job[]) => {
49+
const message = MessageData.parse(job.data);
50+
51+
console.log(`[${job.id}] running: ${message.queueName}`);
52+
53+
try {
54+
await proxyWorkflow(message);
55+
} catch (error) {
56+
console.error(
57+
`[${job.id}] Error handling workflow: ${message.queueName}`,
58+
error
59+
);
60+
throw error;
61+
}
62+
};
63+
64+
const stepHandler = async ([job]: PgBoss.Job[]) => {
65+
const message = MessageData.parse(job.data);
66+
67+
console.log(`[${job.id}] running: ${message.queueName}`);
68+
69+
try {
70+
await proxyStep(message);
71+
} catch (error) {
72+
console.error(
73+
`[${job.id}] Error handling step: ${message.queueName}`,
74+
error
75+
);
76+
throw error;
77+
}
78+
};
79+
80+
for (let i = 0; i < (config.queueConcurrency || 10); i++) {
81+
await boss.work(workflowQueueName, workflowHandler);
82+
await boss.work(stepQueueName, stepHandler);
83+
}
84+
},
85+
};
86+
}
Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,30 @@
11
import { MessageId } from '@workflow/world';
22
import * as z from 'zod';
3-
import { Base64Buffer } from './zod.js';
3+
import { Base64Buffer } from '../zod.js';
44

55
/**
6-
/* pgboss is using JSON under the hood, so we need to base64 encode
7-
/* the body to ensure binary safety
8-
/* maybe later we can have a `blobs` table for larger payloads
9-
**/
6+
/* Most queues are using JSON under the hood, so we need to base64
7+
/* encode the body to ensure binary safety maybe later we can
8+
/* have a `blobs` table for larger payloads
9+
**/
1010
export const MessageData = z.object({
11-
attempt: z.number().describe('The attempt number of the message'),
12-
messageId: MessageId.describe('The unique ID of the message'),
13-
idempotencyKey: z.string().optional(),
1411
id: z
1512
.string()
1613
.describe(
1714
"The ID of the sub-queue. For workflows, it's the workflow name. For steps, it's the step name."
1815
),
16+
17+
idempotencyKey: z.string().optional(),
18+
queueName: z.string().describe('The name of the queue'),
1919
data: Base64Buffer.describe('The message that was sent'),
20+
messageId: MessageId.describe('The unique ID of the message'),
21+
attempt: z.number().describe('The attempt number of the message'),
2022
});
23+
2124
export type MessageData = z.infer<typeof MessageData>;
25+
26+
export interface QueueDriver {
27+
pushStep: (message: MessageData) => Promise<void>;
28+
pushFlow: (message: MessageData) => Promise<void>;
29+
start: () => Promise<void>;
30+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { createRequire } from 'node:module';
2+
import Path from 'node:path';
3+
import { MessageData } from './types.js';
4+
5+
const require = createRequire(Path.join(process.cwd(), 'index.js'));
6+
7+
export async function proxyWorkflow(message: MessageData) {
8+
const workflows = require(process.env.WORKFLOW_POSTGRES_FLOWS!);
9+
10+
const request = new Request('https://world-postgres.local/wkf-direct-call', {
11+
method: 'POST',
12+
headers: { 'Content-Type': 'application/json' },
13+
body: JSON.stringify(MessageData.encode(message)),
14+
});
15+
16+
return (await workflows.__wkf_entrypoint(request)) as Promise<Response>;
17+
}
18+
19+
export async function proxyStep(message: MessageData) {
20+
const steps = require(process.env.WORKFLOW_POSTGRES_STEPS!);
21+
22+
const request = new Request('https://world-postgres.local/wkf-direct-call', {
23+
method: 'POST',
24+
headers: { 'Content-Type': 'application/json' },
25+
body: JSON.stringify(MessageData.encode(message)),
26+
});
27+
28+
return (await steps.__wkf_entrypoint(request)) as Promise<Response>;
29+
}

0 commit comments

Comments
 (0)