Skip to content

Commit 7e7c5ec

Browse files
Nick Sweetingpirate
authored andcommitted
use event bus for llm events
1 parent 3eb42da commit 7e7c5ec

File tree

12 files changed

+1086
-7
lines changed

12 files changed

+1086
-7
lines changed

packages/core/lib/inference.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ export async function act({
435435
}
436436

437437
const start = Date.now();
438-
const rawResponse = await llmClient.createChatCompletion<ActResponse>({
438+
const rawResponse = await createChatCompletionViaEventBus<ActResponse>(eventBus, {
439439
options: {
440440
messages,
441441
response_model: {

packages/core/lib/v3/eventBus.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Central Event Bus for Stagehand
3+
*
4+
* Single event emitter shared by:
5+
* - V3 class (LLM events, library events)
6+
* - StagehandServer (server lifecycle, request/response events)
7+
* - External listeners (cloud servers, monitoring, etc.)
8+
*/
9+
10+
import { EventEmitter } from "events";
11+
import type { StagehandServerEventMap } from "./server/events";
12+
13+
/**
14+
* Type-safe event bus for all Stagehand events
15+
*/
16+
export class StagehandEventBus extends EventEmitter {
17+
/**
18+
* Emit an event and wait for all async listeners to complete
19+
*/
20+
async emitAsync<K extends keyof StagehandServerEventMap>(
21+
event: K,
22+
data: StagehandServerEventMap[K],
23+
): Promise<void> {
24+
const listeners = this.listeners(event);
25+
await Promise.all(listeners.map((listener) => listener(data)));
26+
}
27+
28+
/**
29+
* Type-safe event listener
30+
*/
31+
on<K extends keyof StagehandServerEventMap>(
32+
event: K,
33+
listener: (data: StagehandServerEventMap[K]) => void | Promise<void>,
34+
): this {
35+
return super.on(event, listener);
36+
}
37+
38+
/**
39+
* Type-safe one-time event listener
40+
*/
41+
once<K extends keyof StagehandServerEventMap>(
42+
event: K,
43+
listener: (data: StagehandServerEventMap[K]) => void | Promise<void>,
44+
): this {
45+
return super.once(event, listener);
46+
}
47+
48+
/**
49+
* Type-safe remove listener
50+
*/
51+
off<K extends keyof StagehandServerEventMap>(
52+
event: K,
53+
listener: (data: StagehandServerEventMap[K]) => void | Promise<void>,
54+
): this {
55+
return super.off(event, listener);
56+
}
57+
}
58+
59+
/**
60+
* Create a new event bus instance
61+
*/
62+
export function createEventBus(): StagehandEventBus {
63+
return new StagehandEventBus();
64+
}

packages/core/lib/v3/handlers/actHandler.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
performUnderstudyMethod,
2323
waitForDomNetworkQuiet,
2424
} from "./handlerUtils/actHandlerUtils";
25+
import type { StagehandEventBus } from "../eventBus";
2526

2627
export class ActHandler {
2728
private readonly llmClient: LLMClient;
@@ -40,12 +41,14 @@ export class ActHandler {
4041
inferenceTimeMs: number,
4142
) => void;
4243
private readonly defaultDomSettleTimeoutMs?: number;
44+
private readonly eventBus: StagehandEventBus;
4345

4446
constructor(
4547
llmClient: LLMClient,
4648
defaultModelName: AvailableModel,
4749
defaultClientOptions: ClientOptions,
4850
resolveLlmClient: (model?: ModelConfiguration) => LLMClient,
51+
eventBus: StagehandEventBus,
4952
systemPrompt?: string,
5053
logInferenceToFile?: boolean,
5154
selfHeal?: boolean,
@@ -63,6 +66,7 @@ export class ActHandler {
6366
this.defaultModelName = defaultModelName;
6467
this.defaultClientOptions = defaultClientOptions;
6568
this.resolveLlmClient = resolveLlmClient;
69+
this.eventBus = eventBus;
6670
this.systemPrompt = systemPrompt ?? "";
6771
this.logInferenceToFile = logInferenceToFile ?? false;
6872
this.selfHeal = !!selfHeal;
@@ -100,6 +104,7 @@ export class ActHandler {
100104
instruction: observeActInstruction,
101105
domElements: combinedTree,
102106
llmClient,
107+
eventBus: this.eventBus,
103108
userProvidedInstructions: this.systemPrompt,
104109
logger: v3Logger,
105110
logInferenceToFile: this.logInferenceToFile,
@@ -230,6 +235,7 @@ export class ActHandler {
230235
instruction: stepTwoInstructions,
231236
domElements: diffedTree,
232237
llmClient,
238+
eventBus: this.eventBus,
233239
userProvidedInstructions: this.systemPrompt,
234240
logger: v3Logger,
235241
logInferenceToFile: this.logInferenceToFile,
@@ -422,6 +428,7 @@ export class ActHandler {
422428
instruction,
423429
domElements: combinedTree,
424430
llmClient: effectiveClient,
431+
eventBus: this.eventBus,
425432
userProvidedInstructions: this.systemPrompt,
426433
logger: v3Logger,
427434
logInferenceToFile: this.logInferenceToFile,

packages/core/lib/v3/handlers/extractHandler.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import type {
2525
StagehandZodObject,
2626
StagehandZodSchema,
2727
} from "../zodCompat";
28+
import type { StagehandEventBus } from "../eventBus";
2829

2930
/**
3031
* Scans the provided Zod schema for any `z.string().url()` fields and
@@ -60,6 +61,7 @@ export class ExtractHandler {
6061
private readonly defaultModelName: AvailableModel;
6162
private readonly defaultClientOptions: ClientOptions;
6263
private readonly resolveLlmClient: (model?: ModelConfiguration) => LLMClient;
64+
private readonly eventBus: StagehandEventBus;
6365
private readonly systemPrompt: string;
6466
private readonly logInferenceToFile: boolean;
6567
private readonly experimental: boolean;
@@ -77,6 +79,7 @@ export class ExtractHandler {
7779
defaultModelName: AvailableModel,
7880
defaultClientOptions: ClientOptions,
7981
resolveLlmClient: (model?: ModelConfiguration) => LLMClient,
82+
eventBus: StagehandEventBus,
8083
systemPrompt?: string,
8184
logInferenceToFile?: boolean,
8285
experimental?: boolean,
@@ -93,6 +96,7 @@ export class ExtractHandler {
9396
this.defaultModelName = defaultModelName;
9497
this.defaultClientOptions = defaultClientOptions;
9598
this.resolveLlmClient = resolveLlmClient;
99+
this.eventBus = eventBus;
96100
this.systemPrompt = systemPrompt ?? "";
97101
this.logInferenceToFile = logInferenceToFile ?? false;
98102
this.experimental = experimental ?? false;
@@ -171,6 +175,7 @@ export class ExtractHandler {
171175
domElements: combinedTree,
172176
schema: transformedSchema as StagehandZodObject,
173177
llmClient,
178+
eventBus: this.eventBus,
174179
userProvidedInstructions: this.systemPrompt,
175180
logger: v3Logger,
176181
logInferenceToFile: this.logInferenceToFile,

packages/core/lib/v3/handlers/observeHandler.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ import {
1313
ClientOptions,
1414
ModelConfiguration,
1515
} from "../types/public/model";
16+
import type { StagehandEventBus } from "../eventBus";
1617

1718
export class ObserveHandler {
1819
private readonly llmClient: LLMClient;
1920
private readonly defaultModelName: AvailableModel;
2021
private readonly defaultClientOptions: ClientOptions;
2122
private readonly resolveLlmClient: (model?: ModelConfiguration) => LLMClient;
23+
private readonly eventBus: StagehandEventBus;
2224
private readonly systemPrompt: string;
2325
private readonly logInferenceToFile: boolean;
2426
private readonly experimental: boolean;
@@ -36,6 +38,7 @@ export class ObserveHandler {
3638
defaultModelName: AvailableModel,
3739
defaultClientOptions: ClientOptions,
3840
resolveLlmClient: (model?: ModelConfiguration) => LLMClient,
41+
eventBus: StagehandEventBus,
3942
systemPrompt?: string,
4043
logInferenceToFile?: boolean,
4144
experimental?: boolean,
@@ -52,6 +55,7 @@ export class ObserveHandler {
5255
this.defaultModelName = defaultModelName;
5356
this.defaultClientOptions = defaultClientOptions;
5457
this.resolveLlmClient = resolveLlmClient;
58+
this.eventBus = eventBus;
5559
this.systemPrompt = systemPrompt ?? "";
5660
this.logInferenceToFile = logInferenceToFile ?? false;
5761
this.experimental = experimental ?? false;
@@ -101,6 +105,7 @@ export class ObserveHandler {
101105
instruction: effectiveInstruction,
102106
domElements: combinedTree,
103107
llmClient,
108+
eventBus: this.eventBus,
104109
userProvidedInstructions: this.systemPrompt,
105110
logger: v3Logger,
106111
logInferenceToFile: this.logInferenceToFile,
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/**
2+
* LLM Event Bridge - Routes LLM requests through the event bus
3+
*
4+
* This module provides a bridge between code that needs LLM responses
5+
* and the actual LLM implementations. It uses the event bus to allow
6+
* remote execution of LLM calls.
7+
*/
8+
9+
import { randomUUID } from "crypto";
10+
import type { StagehandEventBus } from "../eventBus";
11+
import type {
12+
ChatCompletionOptions,
13+
CreateChatCompletionOptions,
14+
LLMParsedResponse,
15+
LLMResponse,
16+
} from "./LLMClient";
17+
import type { LogLine } from "../types/public";
18+
19+
/**
20+
* Make an LLM request via the event bus and wait for a response
21+
*
22+
* This function emits a StagehandLLMRequest event and waits for a
23+
* StagehandLLMResponse event with the same requestId.
24+
*
25+
* Returns the same structure as llmClient.createChatCompletion: { data: T, usage?: LLMUsage }
26+
*/
27+
export async function createChatCompletionViaEventBus<T>(
28+
eventBus: StagehandEventBus,
29+
options: CreateChatCompletionOptions,
30+
sessionId?: string,
31+
): Promise<LLMParsedResponse<T>> {
32+
const requestId = randomUUID();
33+
const startTime = Date.now();
34+
35+
// Create a promise that will resolve when we get the response
36+
const responsePromise = new Promise<LLMParsedResponse<T>>((resolve, reject) => {
37+
// Set up a one-time listener for the response
38+
const responseHandler = (data: any) => {
39+
// Only handle responses for this specific request
40+
if (data.requestId === requestId) {
41+
// Remove the listener
42+
eventBus.off("StagehandLLMResponse", responseHandler);
43+
eventBus.off("StagehandLLMError", errorHandler);
44+
45+
// Check if there was an error
46+
if (data.error) {
47+
reject(new Error(data.error.message));
48+
} else {
49+
// Return the same structure as llmClient.createChatCompletion
50+
if (data.parsedResponse) {
51+
resolve(data.parsedResponse as LLMParsedResponse<T>);
52+
} else {
53+
resolve({ data: data.rawResponse as T, usage: data.usage });
54+
}
55+
}
56+
}
57+
};
58+
59+
const errorHandler = (data: any) => {
60+
if (data.requestId === requestId) {
61+
eventBus.off("StagehandLLMResponse", responseHandler);
62+
eventBus.off("StagehandLLMError", errorHandler);
63+
reject(new Error(data.error.message));
64+
}
65+
};
66+
67+
// Listen for both response and error events
68+
eventBus.on("StagehandLLMResponse", responseHandler);
69+
eventBus.on("StagehandLLMError", errorHandler);
70+
71+
// Set a timeout to prevent hanging forever
72+
setTimeout(() => {
73+
eventBus.off("StagehandLLMResponse", responseHandler);
74+
eventBus.off("StagehandLLMError", errorHandler);
75+
reject(new Error("LLM request timeout after 5 minutes"));
76+
}, 5 * 60 * 1000); // 5 minute timeout
77+
});
78+
79+
// Emit the request event
80+
await eventBus.emitAsync("StagehandLLMRequest", {
81+
type: "StagehandLLMRequest",
82+
timestamp: new Date(),
83+
requestId,
84+
sessionId,
85+
modelName: options.options.messages[0]?.role ? "unknown" : "unknown", // Will be set by handler
86+
temperature: options.options.temperature,
87+
maxTokens: options.options.maxOutputTokens,
88+
messages: options.options.messages.map((msg) => ({
89+
role: msg.role,
90+
content:
91+
typeof msg.content === "string"
92+
? msg.content
93+
: msg.content.map((c) => ({
94+
type: c.type,
95+
text: c.text,
96+
image: (c as any).image_url?.url || (c as any).source?.data,
97+
})),
98+
})),
99+
tools: options.options.tools?.map((tool) => ({
100+
name: tool.name,
101+
description: tool.description,
102+
parameters: tool.parameters as Record<string, unknown>,
103+
})),
104+
schema: options.options.response_model?.schema
105+
? (options.options.response_model.schema as any)
106+
: undefined,
107+
requestType: undefined, // Will be determined by context
108+
});
109+
110+
// Wait for and return the response
111+
return responsePromise;
112+
}
113+
114+
/**
115+
* Type guard to check if options include a response_model
116+
*/
117+
export function hasResponseModel(
118+
options: CreateChatCompletionOptions,
119+
): options is CreateChatCompletionOptions & {
120+
options: { response_model: { name: string; schema: any } };
121+
} {
122+
return !!options.options.response_model;
123+
}

0 commit comments

Comments
 (0)