From 5ae194e6fb20e031d198e7bd073c86f2a05a25a1 Mon Sep 17 00:00:00 2001 From: owenpearson Date: Thu, 12 Mar 2026 11:21:07 +0000 Subject: [PATCH] ait: update python token streaming async patterns --- .../messaging/completion-and-cancellation.mdx | 8 ++--- .../token-streaming/message-per-response.mdx | 20 ++++++------ .../token-streaming/message-per-token.mdx | 32 ++++++++++--------- 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/pages/docs/ai-transport/messaging/completion-and-cancellation.mdx b/src/pages/docs/ai-transport/messaging/completion-and-cancellation.mdx index 0e5228f148..9216ce4548 100644 --- a/src/pages/docs/ai-transport/messaging/completion-and-cancellation.mdx +++ b/src/pages/docs/ai-transport/messaging/completion-and-cancellation.mdx @@ -63,18 +63,18 @@ msg_serial = result.serials[0] # Stream tokens async for event in stream: if event['type'] == 'token': - channel.append_message( + asyncio.create_task(channel.append_message( serial=msg_serial, data=event['text'], metadata={'phase': 'streaming'} - ) + )) # Signal content-part completion with an empty append -channel.append_message( +asyncio.create_task(channel.append_message( serial=msg_serial, data='', metadata={'phase': 'done'} -) +)) ``` ```java Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}"); diff --git a/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx b/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx index ae6113c10e..3ef655544f 100644 --- a/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx +++ b/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx @@ -99,7 +99,7 @@ msg_serial = result.serials[0] async for event in stream: # Append each token as it arrives if event['type'] == 'token': - channel.append_message(serial=msg_serial, data=event['text']) + asyncio.create_task(channel.append_message(serial=msg_serial, data=event['text'])) ``` ```java // Publish initial message and capture the serial for appending tokens @@ -150,10 +150,10 @@ for await (const event of stream) { } ``` ```python -# ✅ Do this - append without await for maximum throughput +# ✅ Do this - use create_task for fire-and-forget throughput async for event in stream: if event['type'] == 'token': - channel.append_message(serial=msg_serial, data=event['text']) + asyncio.create_task(channel.append_message(serial=msg_serial, data=event['text'])) # ❌ Don't do this - awaiting each append reduces throughput async for event in stream: @@ -231,8 +231,8 @@ message = Message(name='response', data='') result = await channel.publish(message) msg_serial = result.serials[0] -# Track all append operations -append_operations = [] +# Track all append tasks +append_tasks = [] # Track the full response for recovery full_response = '' @@ -240,12 +240,12 @@ full_response = '' async for event in stream: if event['type'] == 'token': full_response += event['text'] - append_operations.append( - channel.append_message(serial=msg_serial, data=event['text']) + append_tasks.append( + asyncio.create_task(channel.append_message(serial=msg_serial, data=event['text'])) ) # Check for any failures after the stream completes -results = await asyncio.gather(*append_operations, return_exceptions=True) +results = await asyncio.gather(*append_tasks, return_exceptions=True) failed = any(isinstance(r, Exception) for r in results) if failed: @@ -733,7 +733,7 @@ msg_serial = result.serials[0] # Append tokens, including extras to preserve headers async for event in stream: if event['type'] == 'token': - channel.append_message( + asyncio.create_task(channel.append_message( serial=msg_serial, data=event['text'], extras={ @@ -741,7 +741,7 @@ async for event in stream: 'responseId': 'resp_abc123' } } - ) + )) ``` ```java // Publish initial message with responseId in extras diff --git a/src/pages/docs/ai-transport/token-streaming/message-per-token.mdx b/src/pages/docs/ai-transport/token-streaming/message-per-token.mdx index 4867624f35..aa91acbc05 100644 --- a/src/pages/docs/ai-transport/token-streaming/message-per-token.mdx +++ b/src/pages/docs/ai-transport/token-streaming/message-per-token.mdx @@ -51,11 +51,11 @@ for await (const event of stream) { } ``` ```python -# ✅ Do this - publish without await for maximum throughput +# ✅ Do this - use create_task for fire-and-forget throughput async for event in stream: if event['type'] == 'token': message = Message(name='token', data=event['text']) - channel.publish(message) + asyncio.create_task(channel.publish(message)) # ❌ Don't do this - awaiting each publish reduces throughput async for event in stream: @@ -116,8 +116,8 @@ if (failed) { ``` ```python -# Track all publish operations -publish_operations = [] +# Track all publish tasks +publish_tasks = [] # Track the full response for recovery full_response = '' @@ -125,10 +125,12 @@ full_response = '' async for event in stream: if event['type'] == 'token': full_response += event['text'] - publish_operations.append(channel.publish(Message(name='token', data=event['text']))) + publish_tasks.append( + asyncio.create_task(channel.publish(Message(name='token', data=event['text']))) + ) # Check for any failures after the stream completes -results = await asyncio.gather(*publish_operations, return_exceptions=True) +results = await asyncio.gather(*publish_tasks, return_exceptions=True) failed = any(isinstance(r, Exception) for r in results) if failed: @@ -267,7 +269,7 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}') # Example: stream returns events like { 'type': 'token', 'text': 'Hello' } async for event in stream: if event['type'] == 'token': - channel.publish('token', event['text']) + asyncio.create_task(channel.publish('token', event['text'])) ``` ```java Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}"); @@ -347,7 +349,7 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}') # Example: stream returns events like { 'type': 'token', 'text': 'Hello', 'responseId': 'resp_abc123' } async for event in stream: if event['type'] == 'token': - channel.publish(Message( + asyncio.create_task(channel.publish(Message( name='token', data=event['text'], extras={ @@ -355,7 +357,7 @@ async for event in stream: 'responseId': event['responseId'] } } - )) + ))) ``` ```java Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}"); @@ -512,17 +514,17 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}') async for event in stream: if event['type'] == 'message_start': # Publish response start - channel.publish(Message( + asyncio.create_task(channel.publish(Message( name='start', extras={ 'headers': { 'responseId': event['responseId'] } } - )) + ))) elif event['type'] == 'message_delta': # Publish tokens - channel.publish(Message( + asyncio.create_task(channel.publish(Message( name='token', data=event['text'], extras={ @@ -530,17 +532,17 @@ async for event in stream: 'responseId': event['responseId'] } } - )) + ))) elif event['type'] == 'message_stop': # Publish response stop - channel.publish(Message( + asyncio.create_task(channel.publish(Message( name='stop', extras={ 'headers': { 'responseId': event['responseId'] } } - )) + ))) ``` ```java Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}");