Skip to content

Commit 10ce313

Browse files
authored
postgres: fix tests (#394)
* postgres: use non-deprecated drizzle signatures Signed-off-by: Gal Schlezinger <gal@spitfire.co.il> * postgres: store metadata in the hook Signed-off-by: Gal Schlezinger <gal@spitfire.co.il> * core: do not rely on module cache for world config. instead, use a global and a symbol. this makes sure that streamers can use in-memory event emitters and that it won't be compiled away into the different flow.js and step.js files. this was figured out when i was adding a hooks tests to world-testing. Signed-off-by: Gal Schlezinger <gal@spitfire.co.il> * add postgres world to all workbench packages we try to run them with the postgres world but it's not installed Signed-off-by: Gal Schlezinger <gal@spitfire.co.il> * Replace jsonb with cbor because zero byte does not work in jsonb :( Signed-off-by: Gal Schlezinger <gal@spitfire.co.il> * fix error handling: attempts start at 0 now, and not 1 like when we released. so initial attempt in postgres should reflect that. Signed-off-by: Gal Schlezinger <gal@spitfire.co.il> * drain stuff Signed-off-by: Gal Schlezinger <gal@spitfire.co.il> * fallback metadata to metadataJson Signed-off-by: Gal Schlezinger <gal@spitfire.co.il> * Make code more readable Signed-off-by: Gal Schlezinger <gal@spitfire.co.il> * apply Vade fix Signed-off-by: Gal Schlezinger <gal@spitfire.co.il> --------- Signed-off-by: Gal Schlezinger <gal@spitfire.co.il>
1 parent 00e3345 commit 10ce313

File tree

34 files changed

+1218
-95
lines changed

34 files changed

+1218
-95
lines changed

packages/core/src/define-hook.ts

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,43 @@ import type { Hook as HookEntity } from '@workflow/world';
33
import type { Hook, HookOptions } from './create-hook.js';
44
import { resumeHook } from './runtime/resume-hook.js';
55

6+
/**
7+
* A typed hook interface for type-safe hook creation and resumption.
8+
*/
9+
export interface TypedHook<TInput, TOutput> {
10+
/**
11+
* Creates a new hook with the defined output type.
12+
*
13+
* Note: This method is not available in runtime bundles. Use it from workflow contexts only.
14+
*
15+
* @param options - Optional hook configuration
16+
* @returns A Hook that resolves to the defined output type
17+
*/
18+
create(options?: HookOptions): Hook<TOutput>;
19+
/**
20+
* Resumes a hook by sending a payload with the defined input type.
21+
* This is a type-safe wrapper around the `resumeHook` runtime function.
22+
*
23+
* @param token - The unique token identifying the hook
24+
* @param payload - The payload to send; if a `schema` is configured it is validated/transformed before resuming
25+
* @returns Promise resolving to the hook entity
26+
* @throws Error if the hook is not found or if there's an error during the process
27+
*/
28+
resume(token: string, payload: TInput): Promise<HookEntity>;
29+
}
30+
31+
export namespace TypedHook {
32+
/**
33+
* Extracts the input type from a {@link TypedHook}
34+
*/
35+
export type Input<T extends TypedHook<any, any>> = T extends TypedHook<
36+
infer I,
37+
any
38+
>
39+
? I
40+
: never;
41+
}
42+
643
/**
744
* Defines a typed hook for type-safe hook creation and resumption.
845
*
@@ -38,31 +75,13 @@ export function defineHook<TInput, TOutput = TInput>({
3875
schema,
3976
}: {
4077
schema?: StandardSchemaV1<TInput, TOutput>;
41-
} = {}) {
78+
} = {}): TypedHook<TInput, TOutput> {
4279
return {
43-
/**
44-
* Creates a new hook with the defined output type.
45-
*
46-
* Note: This method is not available in runtime bundles. Use it from workflow contexts only.
47-
*
48-
* @param _options - Optional hook configuration
49-
* @returns A Hook that resolves to the defined output type
50-
*/
5180
create(_options?: HookOptions): Hook<TOutput> {
5281
throw new Error(
5382
'`defineHook().create()` can only be called inside a workflow function.'
5483
);
5584
},
56-
57-
/**
58-
* Resumes a hook by sending a payload with the defined input type.
59-
* This is a type-safe wrapper around the `resumeHook` runtime function.
60-
*
61-
* @param token - The unique token identifying the hook
62-
* @param payload - The payload to send; if a `schema` is configured it is validated/transformed before resuming
63-
* @returns Promise resolving to the hook entity
64-
* @throws Error if the hook is not found or if there's an error during the process
65-
*/
6685
async resume(token: string, payload: TInput): Promise<HookEntity> {
6786
if (!schema?.['~standard']) {
6887
return await resumeHook(token, payload);

packages/core/src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ export {
2424
type Webhook,
2525
type WebhookOptions,
2626
} from './create-hook.js';
27-
export { defineHook } from './define-hook.js';
27+
export { defineHook, type TypedHook } from './define-hook.js';
28+
export { sleep } from './sleep.js';
2829
export {
2930
getStepMetadata,
3031
type StepMetadata,
@@ -33,7 +34,6 @@ export {
3334
getWorkflowMetadata,
3435
type WorkflowMetadata,
3536
} from './step/get-workflow-metadata.js';
36-
export { sleep } from './sleep.js';
3737
export {
3838
getWritable,
3939
type WorkflowWritableStreamOptions,

packages/core/src/runtime/world.ts

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@ import { createVercelWorld } from '@workflow/world-vercel';
66

77
const require = createRequire(join(process.cwd(), 'index.js'));
88

9-
let worldCache: World | undefined;
10-
let stubbedWorldCache: World | undefined;
9+
const WorldCache = Symbol.for('@workflow/world//cache');
10+
const StubbedWorldCache = Symbol.for('@workflow/world//stubbedCache');
11+
12+
const globalSymbols: typeof globalThis & {
13+
[WorldCache]?: World;
14+
[StubbedWorldCache]?: World;
15+
} = globalThis;
1116

1217
function defaultWorld(): 'vercel' | 'embedded' {
1318
if (process.env.VERCEL_DEPLOYMENT_ID) {
@@ -67,29 +72,29 @@ export const createWorld = (): World => {
6772
* be able to re-combine getWorld and getWorldHandlers into one singleton.
6873
*/
6974
export const getWorldHandlers = (): Pick<World, 'createQueueHandler'> => {
70-
if (stubbedWorldCache) {
71-
return stubbedWorldCache;
75+
if (globalSymbols[StubbedWorldCache]) {
76+
return globalSymbols[StubbedWorldCache];
7277
}
7378
const _world = createWorld();
74-
stubbedWorldCache = _world;
79+
globalSymbols[StubbedWorldCache] = _world;
7580
return {
7681
createQueueHandler: _world.createQueueHandler,
7782
};
7883
};
7984

8085
export const getWorld = (): World => {
81-
if (worldCache) {
82-
return worldCache;
86+
if (globalSymbols[WorldCache]) {
87+
return globalSymbols[WorldCache];
8388
}
84-
worldCache = createWorld();
85-
return worldCache;
89+
globalSymbols[WorldCache] = createWorld();
90+
return globalSymbols[WorldCache];
8691
};
8792

8893
/**
8994
* Reset the cached world instance. This should be called when environment
9095
* variables change and you need to reinitialize the world with new config.
9196
*/
9297
export const setWorld = (world: World | undefined): void => {
93-
worldCache = world;
94-
stubbedWorldCache = world;
98+
globalSymbols[WorldCache] = world;
99+
globalSymbols[StubbedWorldCache] = world;
95100
};

packages/world-postgres/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
"@workflow/errors": "workspace:*",
5050
"@workflow/world": "workspace:*",
5151
"@workflow/world-local": "workspace:*",
52+
"cbor-x": "1.6.0",
5253
"dotenv": "16.4.5",
5354
"drizzle-orm": "0.44.7",
5455
"pg-boss": "11.0.7",
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { decode, encode } from 'cbor-x';
2+
import { customType } from 'drizzle-orm/pg-core';
3+
4+
export function Cbor<T>() {
5+
return customType<{ data: T; driverData: Buffer }>({
6+
dataType: () => 'bytea',
7+
fromDriver: (value) => decode(value),
8+
toDriver: (value) => encode(value),
9+
});
10+
}
11+
12+
/**
13+
* Adds a `{key}Json` property to the given type V, representing a key that was
14+
* migrated to CBOR and can contain a previous JSONB representation.
15+
*
16+
* We migrated from JSONB to CBOR, but to avoid breaking changes in the codebase,
17+
* we keep both representations in the database, and therefore we need to extend
18+
* the types accordingly.
19+
*/
20+
export type Cborized<V extends object, K extends keyof V> = V & {
21+
[key in `${Extract<K, string>}Json`]: unknown;
22+
};
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
ALTER TABLE "workflow"."workflow_runs" ALTER COLUMN "input" DROP NOT NULL;--> statement-breakpoint
2+
ALTER TABLE "workflow"."workflow_steps" ALTER COLUMN "input" DROP NOT NULL;--> statement-breakpoint
3+
ALTER TABLE "workflow"."workflow_events" ADD COLUMN "payload_cbor" "bytea";--> statement-breakpoint
4+
ALTER TABLE "workflow"."workflow_hooks" ADD COLUMN "metadata_cbor" "bytea";--> statement-breakpoint
5+
ALTER TABLE "workflow"."workflow_runs" ADD COLUMN "output_cbor" "bytea";--> statement-breakpoint
6+
ALTER TABLE "workflow"."workflow_runs" ADD COLUMN "execution_context_cbor" "bytea";--> statement-breakpoint
7+
ALTER TABLE "workflow"."workflow_runs" ADD COLUMN "input_cbor" "bytea";--> statement-breakpoint
8+
ALTER TABLE "workflow"."workflow_steps" ADD COLUMN "input_cbor" "bytea";--> statement-breakpoint
9+
ALTER TABLE "workflow"."workflow_steps" ADD COLUMN "output_cbor" "bytea";--> statement-breakpoint
10+
ALTER TABLE "workflow"."workflow_runs" DROP COLUMN "error_code";--> statement-breakpoint
11+
ALTER TABLE "workflow"."workflow_steps" DROP COLUMN "error_code";

0 commit comments

Comments
 (0)