Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/data/languages/languageData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export default {
},
aiTransport: {
javascript: '2.19',
react: '2.19',
java: '1.6',
python: '3.1',
swift: '1.2',
Expand Down
168 changes: 168 additions & 0 deletions src/pages/docs/ai-transport/token-streaming/message-per-response.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,31 @@ channel.subscribe(message -> {
}
});
```
```react
const [responses, setResponses] = useState(new Map());

// Subscribe to live messages
useChannel('ai:{{RANDOM_CHANNEL_NAME}}', (message) => {
setResponses((prev) => {
const next = new Map(prev);
switch (message.action) {
case 'message.create':
// New response started
next.set(message.serial, message.data);
break;
case 'message.append':
// Append token to existing response
next.set(message.serial, (next.get(message.serial) || '') + message.data);
break;
case 'message.update':
// Replace entire response content
next.set(message.serial, message.data);
break;
}
return next;
});
});
```
</Code>

## Client hydration <a id="hydration"/>
Expand Down Expand Up @@ -549,6 +574,31 @@ channel.subscribe(message -> {
}
});
```
```react
// Set rewind via ChannelProvider options={{ params: { rewind: '2m' } }}

const [responses, setResponses] = useState(new Map());

// Receive both recent historical (via rewind) and live messages
useChannel('ai:{{RANDOM_CHANNEL_NAME}}', (message) => {
setResponses((prev) => {
const next = new Map(prev);
switch (message.action) {
case 'message.create':
next.set(message.serial, message.data);
break;
case 'message.append':
const current = next.get(message.serial) || '';
next.set(message.serial, current + message.data);
break;
case 'message.update':
next.set(message.serial, message.data);
break;
}
return next;
});
});
```
</Code>

Rewind supports two formats:
Expand Down Expand Up @@ -678,6 +728,46 @@ while (page != null) {
page = page.hasNext() ? page.next() : null;
}
```
```react
const [responses, setResponses] = useState(new Map());
const hydrated = useRef(false);

// Subscribe to live messages and get the history function
const { history } = useChannel('ai:{{RANDOM_CHANNEL_NAME}}', (message) => {
setResponses((prev) => {
const next = new Map(prev);
switch (message.action) {
case 'message.create':
next.set(message.serial, message.data);
break;
case 'message.append':
next.set(message.serial, (next.get(message.serial) || '') + message.data);
break;
case 'message.update':
next.set(message.serial, message.data);
break;
}
return next;
});
});

// Fetch history on mount
useEffect(() => {
if (hydrated.current) return;
hydrated.current = true;

(async () => {
let page = await history({ untilAttach: true });
while (page) {
for (const message of page.items) {
// message.data contains the full concatenated text
setResponses((prev) => new Map(prev).set(message.serial, message.data));
}
page = page.hasNext() ? await page.next() : null;
}
})();
}, [history]);
```
</Code>

### Hydrating an in-progress response <a id="in-progress-response"/>
Expand Down Expand Up @@ -911,6 +1001,37 @@ channel.subscribe(message -> {
}
});
```
```react
// Set rewind via ChannelProvider options={{ params: { rewind: '2m' } }}

const [inProgressResponses, setInProgressResponses] = useState(new Map());

// Receive both recent historical and live messages
useChannel('ai:responses', (message) => {
const responseId = message.extras?.headers?.responseId;

if (!responseId) return;

// Skip messages for responses already loaded from database
if (completedResponses.has(responseId)) return;

setInProgressResponses((prev) => {
const next = new Map(prev);
switch (message.action) {
case 'message.create':
next.set(responseId, message.data);
break;
case 'message.append':
next.set(responseId, (next.get(responseId) || '') + message.data);
break;
case 'message.update':
next.set(responseId, message.data);
break;
}
return next;
});
});
```
</Code>

<Aside data-type="note">
Expand Down Expand Up @@ -1118,6 +1239,53 @@ while (page != null) {
page = page.hasNext() ? page.next() : null;
}
```
```react
const [inProgressResponses, setInProgressResponses] = useState(new Map());
const hydrated = useRef(false);

// Subscribe to live messages and get the history function
const { history } = useChannel('ai:{{RANDOM_CHANNEL_NAME}}', (message) => {
const responseId = message.extras?.headers?.responseId;

if (!responseId) return;
if (completedResponses.has(responseId)) return;

setInProgressResponses((prev) => {
const next = new Map(prev);
switch (message.action) {
case 'message.create':
next.set(responseId, message.data);
break;
case 'message.append':
next.set(responseId, (next.get(responseId) || '') + message.data);
break;
case 'message.update':
next.set(responseId, message.data);
break;
}
return next;
});
});

// Fetch history from the last completed response until attachment
useEffect(() => {
if (hydrated.current) return;
hydrated.current = true;

(async () => {
let page = await history({ untilAttach: true, start: latestTimestamp });
while (page) {
for (const message of page.items) {
const responseId = message.extras?.headers?.responseId;
if (responseId) {
setInProgressResponses((prev) => new Map(prev).set(responseId, message.data));
}
}
page = page.hasNext() ? await page.next() : null;
}
})();
}, [history]);
```
</Code>

<Aside data-type="note">
Expand Down
Loading