Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,18 @@ msg_serial = result.serials[0]
# Stream tokens
async for event in stream:
if event['type'] == 'token':
channel.append_message(
asyncio.create_task(channel.append_message(
serial=msg_serial,
data=event['text'],
metadata={'phase': 'streaming'}
)
))

# Signal content-part completion with an empty append
channel.append_message(
asyncio.create_task(channel.append_message(
serial=msg_serial,
data='',
metadata={'phase': 'done'}
)
))
```
```java
Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ msg_serial = result.serials[0]
async for event in stream:
# Append each token as it arrives
if event['type'] == 'token':
channel.append_message(serial=msg_serial, data=event['text'])
asyncio.create_task(channel.append_message(serial=msg_serial, data=event['text']))
```
```java
// Publish initial message and capture the serial for appending tokens
Expand Down Expand Up @@ -150,10 +150,10 @@ for await (const event of stream) {
}
```
```python
# ✅ Do this - append without await for maximum throughput
# ✅ Do this - use create_task for fire-and-forget throughput
async for event in stream:
if event['type'] == 'token':
channel.append_message(serial=msg_serial, data=event['text'])
asyncio.create_task(channel.append_message(serial=msg_serial, data=event['text']))

# ❌ Don't do this - awaiting each append reduces throughput
async for event in stream:
Expand Down Expand Up @@ -231,21 +231,21 @@ message = Message(name='response', data='')
result = await channel.publish(message)
msg_serial = result.serials[0]

# Track all append operations
append_operations = []
# Track all append tasks
append_tasks = []

# Track the full response for recovery
full_response = ''

async for event in stream:
if event['type'] == 'token':
full_response += event['text']
append_operations.append(
channel.append_message(serial=msg_serial, data=event['text'])
append_tasks.append(
asyncio.create_task(channel.append_message(serial=msg_serial, data=event['text']))
)

# Check for any failures after the stream completes
results = await asyncio.gather(*append_operations, return_exceptions=True)
results = await asyncio.gather(*append_tasks, return_exceptions=True)
failed = any(isinstance(r, Exception) for r in results)

if failed:
Expand Down Expand Up @@ -733,15 +733,15 @@ msg_serial = result.serials[0]
# Append tokens, including extras to preserve headers
async for event in stream:
if event['type'] == 'token':
channel.append_message(
asyncio.create_task(channel.append_message(
serial=msg_serial,
data=event['text'],
extras={
'headers': {
'responseId': 'resp_abc123'
}
}
)
))
```
```java
// Publish initial message with responseId in extras
Expand Down
32 changes: 17 additions & 15 deletions src/pages/docs/ai-transport/token-streaming/message-per-token.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ for await (const event of stream) {
}
```
```python
# ✅ Do this - publish without await for maximum throughput
# ✅ Do this - use create_task for fire-and-forget throughput
async for event in stream:
if event['type'] == 'token':
message = Message(name='token', data=event['text'])
channel.publish(message)
asyncio.create_task(channel.publish(message))

# ❌ Don't do this - awaiting each publish reduces throughput
async for event in stream:
Expand Down Expand Up @@ -116,19 +116,21 @@ if (failed) {
```

```python
# Track all publish operations
publish_operations = []
# Track all publish tasks
publish_tasks = []

# Track the full response for recovery
full_response = ''

async for event in stream:
if event['type'] == 'token':
full_response += event['text']
publish_operations.append(channel.publish(Message(name='token', data=event['text'])))
publish_tasks.append(
asyncio.create_task(channel.publish(Message(name='token', data=event['text'])))
)

# Check for any failures after the stream completes
results = await asyncio.gather(*publish_operations, return_exceptions=True)
results = await asyncio.gather(*publish_tasks, return_exceptions=True)
failed = any(isinstance(r, Exception) for r in results)

if failed:
Expand Down Expand Up @@ -267,7 +269,7 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}')
# Example: stream returns events like { 'type': 'token', 'text': 'Hello' }
async for event in stream:
if event['type'] == 'token':
channel.publish('token', event['text'])
asyncio.create_task(channel.publish('token', event['text']))
```
```java
Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}");
Expand Down Expand Up @@ -347,15 +349,15 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}')
# Example: stream returns events like { 'type': 'token', 'text': 'Hello', 'responseId': 'resp_abc123' }
async for event in stream:
if event['type'] == 'token':
channel.publish(Message(
asyncio.create_task(channel.publish(Message(
name='token',
data=event['text'],
extras={
'headers': {
'responseId': event['responseId']
}
}
))
)))
```
```java
Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}");
Expand Down Expand Up @@ -512,35 +514,35 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}')
async for event in stream:
if event['type'] == 'message_start':
# Publish response start
channel.publish(Message(
asyncio.create_task(channel.publish(Message(
name='start',
extras={
'headers': {
'responseId': event['responseId']
}
}
))
)))
elif event['type'] == 'message_delta':
# Publish tokens
channel.publish(Message(
asyncio.create_task(channel.publish(Message(
name='token',
data=event['text'],
extras={
'headers': {
'responseId': event['responseId']
}
}
))
)))
elif event['type'] == 'message_stop':
# Publish response stop
channel.publish(Message(
asyncio.create_task(channel.publish(Message(
name='stop',
extras={
'headers': {
'responseId': event['responseId']
}
}
))
)))
```
```java
Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}");
Expand Down