Skip to content

Commit 53ea440

Browse files
committed
Code organisation
1 parent 802dfef commit 53ea440

File tree

13 files changed

+252
-328
lines changed

13 files changed

+252
-328
lines changed

packages/world-postgres/src/index.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,14 @@ import type { Storage, World } from '@workflow/world';
22
import createPostgres from 'postgres';
33
import type { PostgresWorldConfig } from './config.js';
44
import { createClient, type Drizzle } from './drizzle/index.js';
5+
import { createFunctionProxy } from './proxies/function-proxy.js';
6+
import { createHttpProxy } from './proxies/http-proxy.js';
57
import { createQueue } from './queue.js';
6-
import { createPgBossHttpProxy } from './queue-drivers/pgboss-http-proxy.js';
8+
import {
9+
createPgBossFunctionProxy,
10+
createPgBossHttpProxy,
11+
} from './queue-drivers/factories.js';
12+
import { createPgBossQueue } from './queue-drivers/pgboss.js';
713
import {
814
createEventsStorage,
915
createHooksStorage,
@@ -50,10 +56,10 @@ function createStorage(drizzle: Drizzle): Storage {
5056

5157
function defaultQueueFactory() {
5258
return createPgBossHttpProxy({
53-
securityToken:
54-
process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'this-is-not-safe',
5559
baseUrl: process.env.WORKFLOW_POSTGRES_BASE_URL,
5660
connectionString: process.env.WORKFLOW_POSTGRES_URL || DEFAULT_PG_URL,
61+
securityToken:
62+
process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'this-is-not-safe',
5763
queueConcurrency:
5864
parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) ||
5965
10,
@@ -63,8 +69,6 @@ function defaultQueueFactory() {
6369
export type { PostgresWorldConfig } from './config.js';
6470
// Re-export schema for users who want to extend or inspect the database schema
6571
export * from './drizzle/schema.js';
66-
export type { PgBossFunctionProxyConfig } from './queue-drivers/pgboss-function-proxy.js';
67-
export { createPgBossFunctionProxy } from './queue-drivers/pgboss-function-proxy.js';
68-
export type { PgBossHttpProxyConfig } from './queue-drivers/pgboss-http-proxy.js';
69-
// Re-export queue drivers for custom configurations
70-
export { createPgBossHttpProxy } from './queue-drivers/pgboss-http-proxy.js';
72+
73+
export { createFunctionProxy, createHttpProxy };
74+
export { createPgBossQueue, createPgBossFunctionProxy, createPgBossHttpProxy };
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import type { MessageData } from '../queue-drivers/types.js';
2+
import type { WkfProxy } from './types.js';
3+
import { prepareRequestParams } from './utils.js';
4+
5+
export function createFunctionProxy(opts: {
6+
securityToken: string;
7+
stepEntrypoint: (request: Request) => Promise<Response>;
8+
workflowEntrypoint: (request: Request) => Promise<Response>;
9+
}): WkfProxy {
10+
return {
11+
proxyWorkflow: async (message: MessageData): Promise<Response> => {
12+
const request = new Request(
13+
'https://world-postgres.local/wkf-direct-call',
14+
prepareRequestParams(message, opts.securityToken)
15+
);
16+
17+
return opts.workflowEntrypoint(request);
18+
},
19+
20+
proxyStep: async (message: MessageData): Promise<Response> => {
21+
const request = new Request(
22+
'https://world-postgres.local/wkf-direct-call',
23+
prepareRequestParams(message, opts.securityToken)
24+
);
25+
26+
return opts.stepEntrypoint(request);
27+
},
28+
};
29+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import type { MessageData } from '../queue-drivers/types.js';
2+
import type { WkfProxy } from './types.js';
3+
import { prepareRequestParams } from './utils.js';
4+
5+
export function createHttpProxy(opts: {
6+
port?: number;
7+
baseUrl?: string;
8+
securityToken: string;
9+
}): WkfProxy {
10+
const resolveBaseUrl = (): string => {
11+
if (opts.baseUrl) return opts.baseUrl;
12+
if (opts.port) return `http://localhost:${opts.port}`;
13+
return 'http://localhost:3000';
14+
};
15+
16+
const baseUrl = resolveBaseUrl();
17+
18+
return {
19+
proxyWorkflow: async (message: MessageData): Promise<Response> => {
20+
return fetch(
21+
`${baseUrl}/.well-known/workflow/v1/flow`,
22+
prepareRequestParams(message, opts.securityToken)
23+
);
24+
},
25+
26+
proxyStep: async (message: MessageData): Promise<Response> => {
27+
return fetch(
28+
`${baseUrl}/.well-known/workflow/v1/step`,
29+
prepareRequestParams(message, opts.securityToken)
30+
);
31+
},
32+
};
33+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import type { MessageData } from '../queue-drivers/types.js';
2+
3+
export interface WkfProxy {
4+
proxyWorkflow: (message: MessageData) => Promise<Response>;
5+
proxyStep: (message: MessageData) => Promise<Response>;
6+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { MessageData } from '../queue-drivers/types.js';
2+
3+
export const prepareRequestParams = (
4+
message: MessageData,
5+
securityToken: string
6+
) => {
7+
return {
8+
method: 'POST',
9+
headers: {
10+
'Content-Type': 'application/json',
11+
'X-Workflow-Secret': securityToken,
12+
},
13+
body: JSON.stringify(MessageData.encode(message)),
14+
};
15+
};
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { createFunctionProxy } from '../proxies/function-proxy.js';
2+
import { createHttpProxy } from '../proxies/http-proxy.js';
3+
import { createPgBossQueue } from './pgboss.js';
4+
import type { QueueDriver } from './types.js';
5+
6+
/**
7+
* QueueDriver implementation using pg-boss for job management
8+
* and direct function calls for execution.
9+
*/
10+
export function createPgBossFunctionProxy(opts: {
11+
jobPrefix?: string;
12+
securityToken: string;
13+
connectionString: string;
14+
queueConcurrency?: number;
15+
stepEntrypoint: (request: Request) => Promise<Response>;
16+
workflowEntrypoint: (request: Request) => Promise<Response>;
17+
}): QueueDriver {
18+
return createPgBossQueue(
19+
{
20+
jobPrefix: opts.jobPrefix,
21+
connectionString: opts.connectionString,
22+
queueConcurrency: opts.queueConcurrency,
23+
},
24+
createFunctionProxy({
25+
securityToken: opts.securityToken,
26+
stepEntrypoint: opts.stepEntrypoint,
27+
workflowEntrypoint: opts.workflowEntrypoint,
28+
})
29+
);
30+
}
31+
32+
/**
33+
* QueueDriver implementation using pg-boss for job management
34+
* and HTTP for execution.
35+
*/
36+
export function createPgBossHttpProxy(config: {
37+
port?: number;
38+
baseUrl?: string;
39+
jobPrefix?: string;
40+
securityToken: string;
41+
connectionString: string;
42+
queueConcurrency?: number;
43+
}): QueueDriver {
44+
return createPgBossQueue(
45+
{
46+
jobPrefix: config.jobPrefix,
47+
connectionString: config.connectionString,
48+
queueConcurrency: config.queueConcurrency,
49+
},
50+
createHttpProxy({
51+
port: config.port,
52+
baseUrl: config.baseUrl,
53+
securityToken: config.securityToken,
54+
})
55+
);
56+
}

packages/world-postgres/src/queue-drivers/pgboss-base.ts

Lines changed: 0 additions & 130 deletions
This file was deleted.

packages/world-postgres/src/queue-drivers/pgboss-function-proxy.ts

Lines changed: 0 additions & 35 deletions
This file was deleted.

packages/world-postgres/src/queue-drivers/pgboss-http-proxy.ts

Lines changed: 0 additions & 35 deletions
This file was deleted.

0 commit comments

Comments
 (0)