Skip to content

Commit 069a4ab

Browse files
committed
refactor: browser-use agent & enhance streaming parser
1 parent 1229b23 commit 069a4ab

File tree

9 files changed

+570
-85
lines changed

9 files changed

+570
-85
lines changed

examples/simple-browser-use/components/BrowserAgent.ts

Lines changed: 199 additions & 60 deletions
Large diffs are not rendered by default.

examples/simple-browser-use/components/BrowserStateComponent.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ export class BrowserStateComponent extends Component {
1313
currentUrl: string;
1414
// eslint-disable-next-line @typescript-eslint/no-explicit-any
1515
domSnapshot: { interactive: InteractiveElement[]; meta: any };
16+
domSnapshotSize: number;
1617
lastAction?: string;
1718
lastError?: string;
1819
lastUpdateTime: number;
1920
} = {
2021
currentUrl: 'about:blank',
2122
domSnapshot: { interactive: [], meta: {} },
23+
domSnapshotSize: 0,
2224
lastUpdateTime: Date.now(),
2325
};
2426

@@ -40,7 +42,7 @@ export class BrowserStateComponent extends Component {
4042
// 更新状态
4143
$i('update').receive((update: Partial<typeof this.state>) => {
4244
const oldUrl = this.state.currentUrl;
43-
const oldDomSize = JSON.stringify(this.state.domSnapshot).length;
45+
const oldDomSize = this.state.domSnapshotSize;
4446

4547
// 更新状态
4648
this.state = {
@@ -58,10 +60,8 @@ export class BrowserStateComponent extends Component {
5860
}
5961

6062
// 记录 DOM 变化
61-
if (oldDomSize !== JSON.stringify(this.state.domSnapshot).length) {
62-
console.log(
63-
` DOM 快照: ${oldDomSize} => ${JSON.stringify(this.state.domSnapshot).length} 字符`
64-
);
63+
if (typeof update.domSnapshotSize === 'number' && oldDomSize !== update.domSnapshotSize) {
64+
console.log(` DOM 快照: ${oldDomSize} => ${update.domSnapshotSize} 字符`);
6565
}
6666

6767
// 记录最后动作

examples/simple-browser-use/components/PlanningAgent.ts

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Component } from '@astack-tech/core';
22
import type { ModelProvider } from '@astack-tech/components';
3-
import { Agent, AgentConfig, AgentOutput } from '@astack-tech/components';
3+
import { StreamingAgent, type AgentConfig, type AgentOutput } from '@astack-tech/components';
44

55
// 定义组件配置类型接口
66
interface PlanningAgentConfig {
@@ -18,16 +18,17 @@ interface Plan {
1818
* 遵循 AStack 的 "一切皆组件" 原则,支持零适配层设计
1919
*/
2020
export class PlanningAgent extends Component {
21-
private agent: Agent;
21+
private agent: StreamingAgent;
2222

2323
constructor({ modelProvider }: PlanningAgentConfig) {
2424
super({});
2525
// 组件端口定义
2626
Component.Port.I('intent').attach(this);
2727
Component.Port.O('plan').attach(this);
28+
Component.Port.O('stream').attach(this);
2829

2930
// 初始化规划 Agent
30-
this.agent = new Agent({
31+
this.agent = new StreamingAgent({
3132
model: modelProvider,
3233
systemPrompt: this.buildPlanningPrompt(),
3334
} as AgentConfig);
@@ -37,13 +38,80 @@ export class PlanningAgent extends Component {
3738
_transform($i: any, $o: any) {
3839
$i('intent').receive(async (userIntent: string) => {
3940
try {
40-
// 分析用户意图并生成执行计划
41-
const agentOutput: AgentOutput = await this.agent.run(userIntent);
41+
let finalOutput: AgentOutput | undefined;
42+
let streamedText = '';
43+
44+
for await (const chunk of this.agent.runStream(userIntent)) {
45+
if (chunk.type === 'assistant_message') {
46+
const content = chunk.content ?? '';
47+
let delta = content;
48+
if (content.startsWith(streamedText)) {
49+
delta = content.slice(streamedText.length);
50+
}
51+
streamedText = content;
52+
53+
if (delta) {
54+
$o('stream').send({
55+
source: 'planner',
56+
chunk: {
57+
...chunk,
58+
content: delta,
59+
},
60+
});
61+
}
62+
} else if (chunk.type !== 'model_thinking') {
63+
$o('stream').send({
64+
source: 'planner',
65+
chunk,
66+
});
67+
}
68+
69+
if (chunk.type === 'completed') {
70+
finalOutput = {
71+
message: chunk.finalMessage ?? '',
72+
history: chunk.history ?? [],
73+
toolCalls: chunk.allToolCalls ?? [],
74+
};
75+
76+
// 最终完整结果再发送给前端一次
77+
if (chunk.finalMessage && chunk.finalMessage !== streamedText) {
78+
const delta = chunk.finalMessage.startsWith(streamedText)
79+
? chunk.finalMessage.slice(streamedText.length)
80+
: chunk.finalMessage;
81+
if (delta) {
82+
streamedText = chunk.finalMessage;
83+
$o('stream').send({
84+
source: 'planner',
85+
chunk: {
86+
type: 'assistant_message',
87+
content: delta,
88+
toolCalls: chunk.allToolCalls,
89+
},
90+
});
91+
}
92+
}
93+
}
94+
}
95+
96+
const finalOutputNormalized: AgentOutput =
97+
finalOutput ??
98+
({
99+
message: '',
100+
history: [],
101+
toolCalls: [],
102+
} as AgentOutput);
42103

43104
// 将计划发送到输出端口
44-
$o('plan').send(this.structurePlan(agentOutput.message));
105+
$o('plan').send(this.structurePlan(finalOutputNormalized.message));
45106
} catch (error: unknown) {
46107
$o('plan').send({ error: error instanceof Error ? error.message : String(error) });
108+
$o('stream').send({
109+
source: 'planner',
110+
chunk: {
111+
type: 'error',
112+
error: error instanceof Error ? error.message : String(error),
113+
},
114+
});
47115
}
48116
});
49117
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import { Component } from '@astack-tech/core';
2+
import type { StreamingChunk } from '@astack-tech/components';
3+
4+
export interface StreamEvent {
5+
source: 'planner' | 'browser';
6+
chunk: StreamingChunk;
7+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
8+
context?: Record<string, any>;
9+
}
10+
11+
/**
12+
* StreamCollector 用于收集并转发来自各个组件的流式输出。
13+
* 组件会将事件打印到终端,并通过 events 端口转发给下游组件。
14+
* 为了提升可读性,这里对文本类 chunk 做了增量缓冲,模拟打字机效果。
15+
*/
16+
export class StreamCollector extends Component {
17+
private buffers: Record<string, string> = {};
18+
private prefixPrinted: Set<string> = new Set();
19+
20+
constructor() {
21+
super({});
22+
Component.Port.I('planner').attach(this);
23+
Component.Port.I('browser').attach(this);
24+
Component.Port.O('events').attach(this);
25+
}
26+
27+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
28+
_transform($i: any, $o: any): void {
29+
const forward =
30+
(fallbackSource: StreamEvent['source']) =>
31+
(event: StreamEvent | StreamingChunk | undefined) => {
32+
if (!event) return;
33+
34+
const payload: StreamEvent =
35+
typeof event === 'object' && 'chunk' in event
36+
? { source: event.source ?? fallbackSource, chunk: event.chunk, context: event.context }
37+
: { source: fallbackSource, chunk: event as StreamingChunk };
38+
39+
this.logEvent(payload);
40+
$o('events').send(payload);
41+
};
42+
43+
$i('planner').receive(forward('planner'));
44+
$i('browser').receive(forward('browser'));
45+
}
46+
47+
private logEvent(event: StreamEvent) {
48+
const { source, chunk } = event;
49+
const baseLabel = `[StreamCollector][${source}]`;
50+
51+
switch (chunk.type) {
52+
case 'model_thinking':
53+
case 'assistant_message': {
54+
this.printStreamingText(baseLabel, source, chunk.content ?? '');
55+
break;
56+
}
57+
case 'iteration_start':
58+
this.ensureLineBreak(source);
59+
console.log(`${baseLabel} iteration ${chunk.iteration ?? 0} started`);
60+
break;
61+
case 'tool_start':
62+
this.ensureLineBreak(source);
63+
console.log(`${baseLabel} tool ${chunk.toolName ?? 'unknown'} started`);
64+
break;
65+
case 'tool_result':
66+
this.ensureLineBreak(source);
67+
console.log(`${baseLabel} tool ${chunk.toolName ?? 'unknown'} result`, chunk.result);
68+
break;
69+
case 'completed':
70+
this.ensureLineBreak(source);
71+
console.log(`${baseLabel} completed`);
72+
if (chunk.finalMessage) {
73+
console.log(`${baseLabel} final message: ${chunk.finalMessage}`);
74+
}
75+
delete this.buffers[source];
76+
break;
77+
case 'error':
78+
this.ensureLineBreak(source);
79+
console.error(`${baseLabel} error: ${chunk.error ?? 'unknown error'}`);
80+
delete this.buffers[source];
81+
break;
82+
default:
83+
this.ensureLineBreak(source);
84+
console.log(`${baseLabel} chunk`, chunk);
85+
}
86+
}
87+
88+
private printStreamingText(label: string, source: string, text: string) {
89+
if (!text) return;
90+
91+
const previous = this.buffers[source] ?? '';
92+
let addition = text;
93+
94+
if (text.startsWith(previous)) {
95+
addition = text.slice(previous.length);
96+
}
97+
98+
if (!addition) return;
99+
100+
this.buffers[source] = text;
101+
this.writeWithPrefix(label, source, addition);
102+
}
103+
104+
private writeWithPrefix(label: string, source: string, text: string) {
105+
const prefix = `${label} `;
106+
107+
if (!this.prefixPrinted.has(source)) {
108+
this.prefixPrinted.add(source);
109+
this.writeRaw(prefix);
110+
}
111+
112+
this.writeRaw(text);
113+
}
114+
115+
private ensureLineBreak(source: string) {
116+
if (this.prefixPrinted.has(source)) {
117+
this.prefixPrinted.delete(source);
118+
this.writeRaw('\n');
119+
}
120+
}
121+
122+
private writeRaw(text: string) {
123+
if (typeof process !== 'undefined' && process.stdout) {
124+
process.stdout.write(text);
125+
} else {
126+
// Fallback: 浏览器或其他环境下直接打印
127+
// eslint-disable-next-line no-console
128+
console.log(text);
129+
}
130+
}
131+
}
132+
133+
export default StreamCollector;

examples/simple-browser-use/index.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { PlanningAgent } from './components/PlanningAgent';
44
import { BrowserAgent } from './components/BrowserAgent';
55
import { ResultFormatter } from './components/ResultFormatter';
66
import { BrowserStateComponent } from './components/BrowserStateComponent';
7+
import { StreamCollector } from './components/StreamCollector';
78
import type { ModelProvider as ModelProviderType } from '@astack-tech/components';
89

910
/**
@@ -21,7 +22,8 @@ class OutputCollector extends Component {
2122
_transform($i: any, $o: any) {
2223
$i('result').receive((data: unknown) => {
2324
console.log('[OutputCollector] 收集到最终结果', data);
24-
$o('output').send(data); // 返回结果给 pipeline
25+
this.outPort.send(data); // 默认输出端口供 pipeline 回收
26+
$o('output').send(data); // 保持向外暴露的自定义端口
2527
$o('result').send(data); // 将结果转发到停止端口
2628
});
2729
}
@@ -48,15 +50,27 @@ async function main() {
4850
temperature: 0.5,
4951
}) as ModelProviderType;
5052

53+
const parsedInterval = Number(process.env.BROWSER_SNAPSHOT_INTERVAL_MS);
54+
const snapshotIntervalMs =
55+
Number.isFinite(parsedInterval) && parsedInterval > 0 ? parsedInterval : 1500;
56+
const headless = process.env.BROWSER_HEADLESS !== 'false';
57+
58+
const browserAgent = new BrowserAgent({
59+
modelProvider,
60+
headless,
61+
snapshotIntervalMs,
62+
});
63+
5164
// 创建主 Pipeline
5265
const pipeline = new Pipeline();
5366

5467
// 添加组件,遵循 "一切皆组件" 原则
5568
pipeline.addComponent('planner', new PlanningAgent({ modelProvider }));
5669
pipeline.addComponent('browserState', new BrowserStateComponent());
57-
pipeline.addComponent('browser', new BrowserAgent({ modelProvider }));
70+
pipeline.addComponent('browser', browserAgent);
5871
pipeline.addComponent('formatter', new ResultFormatter());
5972
pipeline.addComponent('collector', new OutputCollector()); // 添加输出收集组件
73+
pipeline.addComponent('stream', new StreamCollector());
6074

6175
// 建立组件之间的连接
6276

@@ -66,9 +80,11 @@ async function main() {
6680

6781
// 任务规划与执行
6882
pipeline.connect('planner.plan', 'browser.task'); // 规划器输出任务到浏览器
69-
83+
pipeline.connect('planner.stream', 'stream.planner');
84+
pipeline.connect('browser.progress', 'stream.browser');
7085
pipeline.connect('browser.result', 'browser.stop');
7186
pipeline.connect('browser.result', 'formatter.data'); // 浏览器结果输出到格式化组件
87+
pipeline.connect('formatter.out', 'collector.result'); // 格式化结果传递给收集器
7288

7389
try {
7490
// 示例:执行自然语言任务

packages/components/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@astack-tech/components",
3-
"version": "0.1.1-beta.2",
3+
"version": "0.1.1-beta.3",
44
"description": "Components for the Astack AI Framework.",
55
"main": "dist/index.cjs",
66
"module": "dist/index.js",

0 commit comments

Comments
 (0)