From 542cf211f6bbc176e81c6037c47ad92d21ba56e4 Mon Sep 17 00:00:00 2001 From: zak Date: Wed, 25 Feb 2026 17:32:56 +0000 Subject: [PATCH 1/3] Add server-side publishing guide for AI Transport New page in AI Transport > Token streaming covering Realtime connections, message ordering guarantees, transient publishing and channel limits, per-connection rate limits for both message-per-response and message-per-token patterns, and a connection pool example for handling multiple concurrent streams. --- src/data/nav/aitransport.ts | 4 + .../token-streaming/server-publishing.mdx | 266 ++++++++++++++++++ 2 files changed, 270 insertions(+) create mode 100644 src/pages/docs/ai-transport/token-streaming/server-publishing.mdx diff --git a/src/data/nav/aitransport.ts b/src/data/nav/aitransport.ts index dfdb5430b6..567ab8bc8d 100644 --- a/src/data/nav/aitransport.ts +++ b/src/data/nav/aitransport.ts @@ -33,6 +33,10 @@ export default { name: 'Token streaming limits', link: '/docs/ai-transport/token-streaming/token-rate-limits', }, + { + name: 'Publish from your server', + link: '/docs/ai-transport/token-streaming/server-publishing', + }, ], }, { diff --git a/src/pages/docs/ai-transport/token-streaming/server-publishing.mdx b/src/pages/docs/ai-transport/token-streaming/server-publishing.mdx new file mode 100644 index 0000000000..df7587da52 --- /dev/null +++ b/src/pages/docs/ai-transport/token-streaming/server-publishing.mdx @@ -0,0 +1,266 @@ +--- +title: Publish from your server +meta_description: "Learn how to publish AI response tokens from your server over a Realtime WebSocket connection, covering ordering, channel limits, and concurrent streams." +--- + +When streaming AI responses with [message per response](/docs/ai-transport/token-streaming/message-per-response) or [message per token](/docs/ai-transport/token-streaming/message-per-token), your server should publish tokens to Ably channels using a Realtime client. Realtime clients maintain persistent WebSocket connections to the Ably service, which provide the low-latency, ordered delivery needed for token streaming. + +## Realtime connections + +Use a Realtime client for server-side publishing with `message.append` or `message.create`. A Realtime client maintains a WebSocket connection to the Ably service, which provides low-latency publishing and guarantees that messages published on the same connection are delivered to subscribers in the order they were published. For more information, see [Realtime and REST](/docs/basics#realtime-and-rest). + +While `publish()` and `appendMessage()` are available on both [REST](/docs/api/rest-sdk) and Realtime clients, REST does not guarantee [message ordering](/docs/platform/architecture/message-ordering) at high publish rates. Use a Realtime client when publishing at the rates typical of LLM token streaming. + +Create a Realtime client on your server: + + +```javascript +const ably = new Ably.Realtime({ + key: '{{API_KEY}}', + echoMessages: false +}); +``` +```python +ably = AblyRealtime( + key='{{API_KEY}}', + echo_messages=False +) +``` +```java +ClientOptions options = new ClientOptions(); +options.key = "{{API_KEY}}"; +options.echoMessages = false; +AblyRealtime ably = new AblyRealtime(options); +``` + + + + +## Message ordering + +Ably guarantees that messages published on a single connection are delivered to subscribers in the order they were published. This ordering guarantee is essential for token streaming, because tokens must arrive in sequence for the final message to be correct. + +This guarantee is per-connection. If you use [multiple connections](#multiple-streams) to handle higher concurrency, route all operations for a given channel through the same connection. This ensures that all `publish()` and `appendMessage()` calls for a given response maintain their order. + +For more detail on how Ably preserves message order across its globally distributed infrastructure, see [message ordering](/docs/platform/architecture/message-ordering). + +## Transient publishing and channel limits + +In a typical AI application, your server publishes responses to many distinct channels, often one per user session. When your server only publishes to a channel without subscribing, the SDK uses a [transient publish](/docs/pub-sub/advanced#transient-publish). Transient publishes do not count toward the limit on [number of channels per connection](/docs/platform/pricing/limits#connection). + +All message actions use the same transient publish path, including `publish()` and `appendMessage()`. This means a single connection can publish to thousands of distinct channels without hitting the channel limit. No additional configuration is required. When you call `publish()` or `appendMessage()` on a channel that the client has not explicitly attached to, the SDK handles the transient attachment automatically. + +The constraint to be aware of is the [per-connection inbound message rate](/docs/platform/pricing/limits#connection), not the number of channels. + + + +## Per-connection rate limits + +Each connection has an [inbound message rate limit](/docs/platform/pricing/limits#connection) that caps how many messages per second can be published on that connection. How this limit interacts with your server's publish rate depends on which token streaming pattern you use. + +### Message per response + +With [message per response](/docs/ai-transport/token-streaming/message-per-response), your server calls `appendMessage()` at whatever rate the model produces tokens. Ably concatenates all appends received within a configurable time window, called the `appendRollupWindow`, into a single message before publishing it to subscribers. This is similar to [server-side batching](/docs/messages/batch#server-side), but happens at the edge of Ably's network where the connection is handled, before the message is published onwards. The connection's [inbound message rate limit](/docs/platform/pricing/limits#connection) applies to these rolled-up messages, not to individual `appendMessage()` calls. + +For example, if your server publishes appends at 100 msg/s with a 40ms rollup window: + +- 40ms window = 25 windows per second (1000ms / 40ms). +- Ably produces 25 rolled-up messages per second. +- Each rolled-up message contains approximately 4 tokens (100 / 25). +- The 25 msg/s rolled-up rate is what counts against the connection's inbound message rate limit. + +The following table shows how the `appendRollupWindow` affects the delivery rate to subscribers, concurrent stream capacity, and the trade-off in delivery granularity: + +| `appendRollupWindow` | Client delivery rate | Tokens per delivery (at 100 tok/s) | Concurrent streams per connection | +| --- | --- | --- | --- | +| 40ms *(default)* | 25 msg/s | ~4 | 2 | +| 100ms | 10 msg/s | ~10 | 5 | +| 200ms | 5 msg/s | ~20 | 10 | +| 500ms *(max)* | 2 msg/s | ~50 | 25 | + +_Concurrent streams per connection is calculated based on the connection's [max inbound rate limit](/docs/platform/pricing/limits#connection)._ + +A longer rollup window allows more concurrent streams per connection, but subscribers receive tokens in larger, less frequent batches. The default 40ms window provides smooth delivery for up to two concurrent streams. Increasing the window to 100ms or 200ms accommodates more streams with a modest reduction in delivery granularity. + +Set the rollup window when creating the client: + + +```javascript +const ably = new Ably.Realtime({ + key: '{{API_KEY}}', + echoMessages: false, + transportParams: { appendRollupWindow: 100 } +}); +``` +```python +ably = AblyRealtime( + key='{{API_KEY}}', + echo_messages=False, + transport_params={'appendRollupWindow': 100} +) +``` +```java +ClientOptions options = new ClientOptions(); +options.key = "{{API_KEY}}"; +options.echoMessages = false; +options.transportParams = Map.of("appendRollupWindow", "100"); +AblyRealtime ably = new AblyRealtime(options); +``` + + +For full detail on rollup configuration, see [token streaming limits](/docs/ai-transport/token-streaming/token-rate-limits#rollup). + +### Message per token + +With [message per token](/docs/ai-transport/token-streaming/message-per-token), each token is a separate `publish()` call with no rollup. Every publish counts directly against the [connection inbound message rate limit](/docs/platform/pricing/limits#connection). A model outputting 50 tokens per second saturates a single connection, leaving no capacity for additional streams. + +Account for your model's peak output rate when determining how many concurrent streams a single connection can support. For strategies to stay within limits, see [token streaming limits](/docs/ai-transport/token-streaming/token-rate-limits#per-token). + +## Multiple concurrent streams + +When your server handles more concurrent AI response streams than a single connection supports, create additional Realtime clients. Each client uses its own connection with its own message rate budget, so throughput scales linearly with the number of connections. + +Route channels to connections using consistent hashing so that all operations for a given channel always go through the same connection. This preserves [message ordering](#ordering) for each response. + + +```javascript +class AblyConnectionPool { + constructor(size, options) { + this._connections = Array.from({ length: size }, (_, i) => { + const client = new Ably.Realtime(options); + + client.connection.on((stateChange) => { + console.warn(`[Pool conn ${i}] ${stateChange.previous} → ${stateChange.current}`); + if (stateChange.current === 'failed') { + this._replaceConnection(i, options); + } + }); + + return client; + }); + + this._channelMap = new Map(); + this._options = options; + } + + _replaceConnection(index) { + const oldClient = this._connections[index]; + + const affectedChannels = []; + for (const [channelName, connIndex] of this._channelMap) { + if (connIndex === index) affectedChannels.push(channelName); + } + + try { oldClient.close(); } catch (e) { /* already dead */ } + + const newClient = new Ably.Realtime(this._options); + newClient.connection.on((stateChange) => { + console.warn(`[Pool conn ${index}] ${stateChange.previous} → ${stateChange.current}`); + if (stateChange.current === 'failed') { + this._replaceConnection(index); + } + }); + this._connections[index] = newClient; + + console.warn(`[Pool] Replaced conn ${index}, ${affectedChannels.length} channels affected`); + } + + _hashCode(str) { + let hash = 0; + for (let i = 0; i < str.length; i++) { + hash = ((hash << 5) - hash + str.charCodeAt(i)) | 0; + } + return Math.abs(hash); + } + + getClient(channelName) { + const index = this._hashCode(channelName) % this._connections.length; + this._channelMap.set(channelName, index); + return this._connections[index]; + } + + getChannel(channelName, channelOptions) { + const client = this.getClient(channelName); + return client.channels.get(channelName, channelOptions); + } + + releaseChannel(channelName) { + const index = this._channelMap.get(channelName); + if (index !== undefined) { + const client = this._connections[index]; + client.channels.release(channelName); + this._channelMap.delete(channelName); + } + } + + async close() { + const detachPromises = []; + for (const [channelName, index] of this._channelMap) { + const client = this._connections[index]; + const channel = client.channels.get(channelName); + detachPromises.push(channel.detach().catch(() => {})); + } + await Promise.allSettled(detachPromises); + this._channelMap.clear(); + + for (const client of this._connections) { + client.close(); + } + } + + getStatus() { + return this._connections.map((client, i) => ({ + index: i, + state: client.connection.state, + channels: [...this._channelMap.entries()] + .filter(([, idx]) => idx === i) + .map(([name]) => name), + })); + } +} + +const pool = new AblyConnectionPool(10, { + key: 'your-api-key', + echoMessages: false, + transportParams: { appendRollupWindow: 100 }, +}); + +// Publish a streaming response +const channel = pool.getChannel('ai:session-123'); + +// Ensure channel rule has "Message annotations, updates, deletes and appends" enabled +// for the ai:* namespace in your Ably dashboard +const { serials: [msgSerial] } = await channel.publish({ name: 'response', data: '' }); + +for await (const event of stream) { + if (event.type === 'token') { + channel.appendMessage({ serial: msgSerial, data: event.text }); + } +} + +// When session ends +pool.releaseChannel('ai:session-123'); + +// On server shutdown +await pool.close(); +``` + + +### Size the connection pool + +The number of connections you need depends on your peak concurrent streams and your per-stream message rate. + +For [message per response](/docs/ai-transport/token-streaming/message-per-response), the rollup window controls per-stream message rate: + +| `appendRollupWindow` | Streams per connection | Streams per 5 conns | Streams per 10 conns | Streams per 20 conns | +| --- | --- | --- | --- | --- | +| 40ms *(default)* | 2 | 10 | 20 | 40 | +| 100ms | 5 | 25 | 50 | 100 | +| 200ms | 10 | 50 | 100 | 200 | + +For [message per token](/docs/ai-transport/token-streaming/message-per-token), divide your [connection inbound message rate](/docs/platform/pricing/limits#connection) by the model's peak output rate to determine how many streams fit per connection. If a model outputs 50 tokens per second, a single stream saturates one connection. + From f5ac792c53928e98d93dd60e02f64aaff8cb0e48 Mon Sep 17 00:00:00 2001 From: zak Date: Mon, 2 Mar 2026 16:49:03 +0000 Subject: [PATCH 2/3] Address PR review feedback for server publishing guide Simplify the transient publishing section to clarify when transient publish is used and why the per-connection inbound message rate is the binding constraint. Rename AblyConnectionPool to AblyClientPool for accuracy since the pool manages client instances. Fix "consistent hashing" terminology to "hash function" since the implementation uses modulo hashing. Remove the _replaceConnection method which could cause stack overflow and is unlikely to recover from network failures. --- .../token-streaming/server-publishing.mdx | 60 ++++++------------- 1 file changed, 17 insertions(+), 43 deletions(-) diff --git a/src/pages/docs/ai-transport/token-streaming/server-publishing.mdx b/src/pages/docs/ai-transport/token-streaming/server-publishing.mdx index df7587da52..ec01f1ecdf 100644 --- a/src/pages/docs/ai-transport/token-streaming/server-publishing.mdx +++ b/src/pages/docs/ai-transport/token-streaming/server-publishing.mdx @@ -48,16 +48,16 @@ For more detail on how Ably preserves message order across its globally distribu ## Transient publishing and channel limits -In a typical AI application, your server publishes responses to many distinct channels, often one per user session. When your server only publishes to a channel without subscribing, the SDK uses a [transient publish](/docs/pub-sub/advanced#transient-publish). Transient publishes do not count toward the limit on [number of channels per connection](/docs/platform/pricing/limits#connection). - -All message actions use the same transient publish path, including `publish()` and `appendMessage()`. This means a single connection can publish to thousands of distinct channels without hitting the channel limit. No additional configuration is required. When you call `publish()` or `appendMessage()` on a channel that the client has not explicitly attached to, the SDK handles the transient attachment automatically. - -The constraint to be aware of is the [per-connection inbound message rate](/docs/platform/pricing/limits#connection), not the number of channels. +In a typical AI application, your server publishes responses to many distinct channels, often one per user session. When your server publishes to a channel without attaching first, the SDK uses a [transient publish](/docs/pub-sub/advanced#transient-publish). Transient publishes do not count toward the limit on the [number of channels per connection](/docs/platform/pricing/limits#connection). +Because transient publishing removes the channel limit as a constraint, the limiting factor becomes the [per-connection inbound message rate](/docs/platform/pricing/limits#connection). Typically, each channel represents a conversation between a user and an agent, so a single connection publishes to multiple channels simultaneously. Since the per-connection and per-channel inbound rate limits are the same value, you are likely to hit the connection limit first because it is shared across all channels on that connection. + +See [per-connection rate limits](#rate-limits) for how this interacts with each token streaming pattern. + ## Per-connection rate limits Each connection has an [inbound message rate limit](/docs/platform/pricing/limits#connection) that caps how many messages per second can be published on that connection. How this limit interacts with your server's publish rate depends on which token streaming pattern you use. @@ -124,49 +124,23 @@ Account for your model's peak output rate when determining how many concurrent s When your server handles more concurrent AI response streams than a single connection supports, create additional Realtime clients. Each client uses its own connection with its own message rate budget, so throughput scales linearly with the number of connections. -Route channels to connections using consistent hashing so that all operations for a given channel always go through the same connection. This preserves [message ordering](#ordering) for each response. +Route channels to clients using a hash function so that all operations for a given channel always go through the same client. This preserves [message ordering](#ordering) for each response. ```javascript -class AblyConnectionPool { +class AblyClientPool { constructor(size, options) { - this._connections = Array.from({ length: size }, (_, i) => { + this._clients = Array.from({ length: size }, (_, i) => { const client = new Ably.Realtime(options); client.connection.on((stateChange) => { - console.warn(`[Pool conn ${i}] ${stateChange.previous} → ${stateChange.current}`); - if (stateChange.current === 'failed') { - this._replaceConnection(i, options); - } + console.warn(`[Pool client ${i}] ${stateChange.previous} → ${stateChange.current}`); }); return client; }); this._channelMap = new Map(); - this._options = options; - } - - _replaceConnection(index) { - const oldClient = this._connections[index]; - - const affectedChannels = []; - for (const [channelName, connIndex] of this._channelMap) { - if (connIndex === index) affectedChannels.push(channelName); - } - - try { oldClient.close(); } catch (e) { /* already dead */ } - - const newClient = new Ably.Realtime(this._options); - newClient.connection.on((stateChange) => { - console.warn(`[Pool conn ${index}] ${stateChange.previous} → ${stateChange.current}`); - if (stateChange.current === 'failed') { - this._replaceConnection(index); - } - }); - this._connections[index] = newClient; - - console.warn(`[Pool] Replaced conn ${index}, ${affectedChannels.length} channels affected`); } _hashCode(str) { @@ -178,9 +152,9 @@ class AblyConnectionPool { } getClient(channelName) { - const index = this._hashCode(channelName) % this._connections.length; + const index = this._hashCode(channelName) % this._clients.length; this._channelMap.set(channelName, index); - return this._connections[index]; + return this._clients[index]; } getChannel(channelName, channelOptions) { @@ -191,7 +165,7 @@ class AblyConnectionPool { releaseChannel(channelName) { const index = this._channelMap.get(channelName); if (index !== undefined) { - const client = this._connections[index]; + const client = this._clients[index]; client.channels.release(channelName); this._channelMap.delete(channelName); } @@ -200,20 +174,20 @@ class AblyConnectionPool { async close() { const detachPromises = []; for (const [channelName, index] of this._channelMap) { - const client = this._connections[index]; + const client = this._clients[index]; const channel = client.channels.get(channelName); detachPromises.push(channel.detach().catch(() => {})); } await Promise.allSettled(detachPromises); this._channelMap.clear(); - for (const client of this._connections) { + for (const client of this._clients) { client.close(); } } getStatus() { - return this._connections.map((client, i) => ({ + return this._clients.map((client, i) => ({ index: i, state: client.connection.state, channels: [...this._channelMap.entries()] @@ -223,7 +197,7 @@ class AblyConnectionPool { } } -const pool = new AblyConnectionPool(10, { +const pool = new AblyClientPool(10, { key: 'your-api-key', echoMessages: false, transportParams: { appendRollupWindow: 100 }, From d022ed9d7ebddb0359e34aa1dcb72e602c2b791b Mon Sep 17 00:00:00 2001 From: zak Date: Wed, 4 Mar 2026 09:49:25 +0000 Subject: [PATCH 3/3] Deduplicate rate limit and pool content Move detailed rate limit content (rollup tables, code examples, concurrent streams data) from server-publishing.mdx to token-rate-limits.mdx where it belongs. Replace the full AblyClientPool implementation with concise conceptual guidance, as the pool abstraction will be implemented in ably-js instead. server-publishing.mdx now links to token-rate-limits.mdx for rollup configuration details and retains a brief overview section for each token streaming pattern. --- .../token-streaming/server-publishing.mdx | 183 +----------------- .../token-streaming/token-rate-limits.mdx | 32 +-- 2 files changed, 28 insertions(+), 187 deletions(-) diff --git a/src/pages/docs/ai-transport/token-streaming/server-publishing.mdx b/src/pages/docs/ai-transport/token-streaming/server-publishing.mdx index ec01f1ecdf..7142b78bd2 100644 --- a/src/pages/docs/ai-transport/token-streaming/server-publishing.mdx +++ b/src/pages/docs/ai-transport/token-streaming/server-publishing.mdx @@ -42,7 +42,7 @@ Set [`echoMessages`](/docs/api/realtime-sdk/types#client-options) to `false` on Ably guarantees that messages published on a single connection are delivered to subscribers in the order they were published. This ordering guarantee is essential for token streaming, because tokens must arrive in sequence for the final message to be correct. -This guarantee is per-connection. If you use [multiple connections](#multiple-streams) to handle higher concurrency, route all operations for a given channel through the same connection. This ensures that all `publish()` and `appendMessage()` calls for a given response maintain their order. +This guarantee is per-connection. If you use [multiple clients](#multiple-streams) to handle higher concurrency, route all operations for a given channel through the same client. This ensures that all `publish()` and `appendMessage()` calls for a given response maintain their order. For more detail on how Ably preserves message order across its globally distributed infrastructure, see [message ordering](/docs/platform/architecture/message-ordering). @@ -54,187 +54,18 @@ In a typical AI application, your server publishes responses to many distinct ch The server must attach to the channel in order to subscribe to it. In this case, the SDK client instance will not use transient publishing. -Because transient publishing removes the channel limit as a constraint, the limiting factor becomes the [per-connection inbound message rate](/docs/platform/pricing/limits#connection). Typically, each channel represents a conversation between a user and an agent, so a single connection publishes to multiple channels simultaneously. Since the per-connection and per-channel inbound rate limits are the same value, you are likely to hit the connection limit first because it is shared across all channels on that connection. - -See [per-connection rate limits](#rate-limits) for how this interacts with each token streaming pattern. +Because transient publishing removes the channel limit as a constraint, the limiting factor becomes the [per-connection inbound message rate](/docs/platform/pricing/limits#connection). Typically, each channel represents a conversation between a user and an agent, so a single connection publishes to multiple channels simultaneously. The per-connection and per-channel inbound rate limits are the same value, but the connection limit is shared across all channels on that connection, making it the constraint you will reach first. See [per-connection rate limits](#rate-limits) for how this interacts with each token streaming pattern. ## Per-connection rate limits -Each connection has an [inbound message rate limit](/docs/platform/pricing/limits#connection) that caps how many messages per second can be published on that connection. How this limit interacts with your server's publish rate depends on which token streaming pattern you use. - -### Message per response - -With [message per response](/docs/ai-transport/token-streaming/message-per-response), your server calls `appendMessage()` at whatever rate the model produces tokens. Ably concatenates all appends received within a configurable time window, called the `appendRollupWindow`, into a single message before publishing it to subscribers. This is similar to [server-side batching](/docs/messages/batch#server-side), but happens at the edge of Ably's network where the connection is handled, before the message is published onwards. The connection's [inbound message rate limit](/docs/platform/pricing/limits#connection) applies to these rolled-up messages, not to individual `appendMessage()` calls. - -For example, if your server publishes appends at 100 msg/s with a 40ms rollup window: - -- 40ms window = 25 windows per second (1000ms / 40ms). -- Ably produces 25 rolled-up messages per second. -- Each rolled-up message contains approximately 4 tokens (100 / 25). -- The 25 msg/s rolled-up rate is what counts against the connection's inbound message rate limit. - -The following table shows how the `appendRollupWindow` affects the delivery rate to subscribers, concurrent stream capacity, and the trade-off in delivery granularity: - -| `appendRollupWindow` | Client delivery rate | Tokens per delivery (at 100 tok/s) | Concurrent streams per connection | -| --- | --- | --- | --- | -| 40ms *(default)* | 25 msg/s | ~4 | 2 | -| 100ms | 10 msg/s | ~10 | 5 | -| 200ms | 5 msg/s | ~20 | 10 | -| 500ms *(max)* | 2 msg/s | ~50 | 25 | - -_Concurrent streams per connection is calculated based on the connection's [max inbound rate limit](/docs/platform/pricing/limits#connection)._ - -A longer rollup window allows more concurrent streams per connection, but subscribers receive tokens in larger, less frequent batches. The default 40ms window provides smooth delivery for up to two concurrent streams. Increasing the window to 100ms or 200ms accommodates more streams with a modest reduction in delivery granularity. - -Set the rollup window when creating the client: - - -```javascript -const ably = new Ably.Realtime({ - key: '{{API_KEY}}', - echoMessages: false, - transportParams: { appendRollupWindow: 100 } -}); -``` -```python -ably = AblyRealtime( - key='{{API_KEY}}', - echo_messages=False, - transport_params={'appendRollupWindow': 100} -) -``` -```java -ClientOptions options = new ClientOptions(); -options.key = "{{API_KEY}}"; -options.echoMessages = false; -options.transportParams = Map.of("appendRollupWindow", "100"); -AblyRealtime ably = new AblyRealtime(options); -``` - - -For full detail on rollup configuration, see [token streaming limits](/docs/ai-transport/token-streaming/token-rate-limits#rollup). +Each connection has an [inbound message rate limit](/docs/platform/pricing/limits#connection) that caps how many messages per second can be published on that connection. How this limit interacts with your publish rate depends on the token streaming pattern you use: -### Message per token - -With [message per token](/docs/ai-transport/token-streaming/message-per-token), each token is a separate `publish()` call with no rollup. Every publish counts directly against the [connection inbound message rate limit](/docs/platform/pricing/limits#connection). A model outputting 50 tokens per second saturates a single connection, leaving no capacity for additional streams. - -Account for your model's peak output rate when determining how many concurrent streams a single connection can support. For strategies to stay within limits, see [token streaming limits](/docs/ai-transport/token-streaming/token-rate-limits#per-token). +- With [message per response](/docs/ai-transport/token-streaming/message-per-response), Ably rolls up multiple appends into fewer published messages. The rollup window determines how many concurrent streams a single connection can support. See [token streaming limits](/docs/ai-transport/token-streaming/token-rate-limits#rollup) for rollup configuration and concurrent stream capacity. +- With [message per token](/docs/ai-transport/token-streaming/message-per-token), each token is a separate publish with no rollup. Every publish counts directly against the connection's rate limit. See [token streaming limits](/docs/ai-transport/token-streaming/token-rate-limits#per-token) for strategies to stay within limits. ## Multiple concurrent streams -When your server handles more concurrent AI response streams than a single connection supports, create additional Realtime clients. Each client uses its own connection with its own message rate budget, so throughput scales linearly with the number of connections. - -Route channels to clients using a hash function so that all operations for a given channel always go through the same client. This preserves [message ordering](#ordering) for each response. - - -```javascript -class AblyClientPool { - constructor(size, options) { - this._clients = Array.from({ length: size }, (_, i) => { - const client = new Ably.Realtime(options); - - client.connection.on((stateChange) => { - console.warn(`[Pool client ${i}] ${stateChange.previous} → ${stateChange.current}`); - }); - - return client; - }); - - this._channelMap = new Map(); - } - - _hashCode(str) { - let hash = 0; - for (let i = 0; i < str.length; i++) { - hash = ((hash << 5) - hash + str.charCodeAt(i)) | 0; - } - return Math.abs(hash); - } - - getClient(channelName) { - const index = this._hashCode(channelName) % this._clients.length; - this._channelMap.set(channelName, index); - return this._clients[index]; - } - - getChannel(channelName, channelOptions) { - const client = this.getClient(channelName); - return client.channels.get(channelName, channelOptions); - } - - releaseChannel(channelName) { - const index = this._channelMap.get(channelName); - if (index !== undefined) { - const client = this._clients[index]; - client.channels.release(channelName); - this._channelMap.delete(channelName); - } - } - - async close() { - const detachPromises = []; - for (const [channelName, index] of this._channelMap) { - const client = this._clients[index]; - const channel = client.channels.get(channelName); - detachPromises.push(channel.detach().catch(() => {})); - } - await Promise.allSettled(detachPromises); - this._channelMap.clear(); - - for (const client of this._clients) { - client.close(); - } - } - - getStatus() { - return this._clients.map((client, i) => ({ - index: i, - state: client.connection.state, - channels: [...this._channelMap.entries()] - .filter(([, idx]) => idx === i) - .map(([name]) => name), - })); - } -} - -const pool = new AblyClientPool(10, { - key: 'your-api-key', - echoMessages: false, - transportParams: { appendRollupWindow: 100 }, -}); - -// Publish a streaming response -const channel = pool.getChannel('ai:session-123'); - -// Ensure channel rule has "Message annotations, updates, deletes and appends" enabled -// for the ai:* namespace in your Ably dashboard -const { serials: [msgSerial] } = await channel.publish({ name: 'response', data: '' }); - -for await (const event of stream) { - if (event.type === 'token') { - channel.appendMessage({ serial: msgSerial, data: event.text }); - } -} - -// When session ends -pool.releaseChannel('ai:session-123'); - -// On server shutdown -await pool.close(); -``` - - -### Size the connection pool - -The number of connections you need depends on your peak concurrent streams and your per-stream message rate. - -For [message per response](/docs/ai-transport/token-streaming/message-per-response), the rollup window controls per-stream message rate: - -| `appendRollupWindow` | Streams per connection | Streams per 5 conns | Streams per 10 conns | Streams per 20 conns | -| --- | --- | --- | --- | --- | -| 40ms *(default)* | 2 | 10 | 20 | 40 | -| 100ms | 5 | 25 | 50 | 100 | -| 200ms | 10 | 50 | 100 | 200 | +When your server handles more concurrent AI response streams than a single connection supports, create additional Realtime clients. Each client uses its own connection with its own message rate budget, so throughput scales linearly with the number of clients. -For [message per token](/docs/ai-transport/token-streaming/message-per-token), divide your [connection inbound message rate](/docs/platform/pricing/limits#connection) by the model's peak output rate to determine how many streams fit per connection. If a model outputs 50 tokens per second, a single stream saturates one connection. +Route all operations for a given channel through the same client to preserve [message ordering](#ordering). Use a hash of the channel name to deterministically select a client, ensuring that all `publish()` and `appendMessage()` calls for a given response go through the same connection. diff --git a/src/pages/docs/ai-transport/token-streaming/token-rate-limits.mdx b/src/pages/docs/ai-transport/token-streaming/token-rate-limits.mdx index b6f095d170..3e17d709cc 100644 --- a/src/pages/docs/ai-transport/token-streaming/token-rate-limits.mdx +++ b/src/pages/docs/ai-transport/token-streaming/token-rate-limits.mdx @@ -16,26 +16,36 @@ The limits in the second category, however, cannot be increased arbitrarily and ## Message-per-response -The [message-per-response](/docs/ai-transport/token-streaming/message-per-response) pattern includes automatic rate limit protection. AI Transport prevents a single response stream from reaching the message rate limit for a connection by rolling up multiple appends into a single published message: +The [message-per-response](/docs/ai-transport/token-streaming/message-per-response) pattern includes automatic rate limit protection. AI Transport rolls up multiple appends into a single published message before applying [connection](/docs/platform/pricing/limits#connection) and [channel](/docs/platform/pricing/limits#channel) rate limits. This means the rate limits apply to the rolled-up messages, not to individual `appendMessage()` calls. The rollup works as follows: 1. Your agent streams tokens to the channel at the model's output rate -2. Ably publishes the first token immediately, then automatically rolls up subsequent tokens on receipt -3. Clients receive the same content, delivered in fewer discrete messages +2. Ably publishes the first token immediately, then automatically rolls up subsequent tokens as they are received +3. Clients receive the same content, delivered in fewer discrete messages and as larger contiguous chunks -By default, Ably delivers a single response stream at 25 messages per second or the model output rate, whichever is lower. This means you can publish two simultaneous response streams on the same channel or connection with any [Ably package](/docs/platform/pricing#packages), because each stream uses half of the [connection inbound message rate](/docs/platform/pricing/limits#connection). Ably charges for the number of published messages, not for the number of streamed tokens. +By default, Ably delivers a single response stream at 25 messages per second or the model output rate, whichever is lower. This means you can publish two simultaneous response streams on the same channel or connection with any [Ably package](/docs/platform/pricing#packages), because each stream uses half of the [connection inbound message rate](/docs/platform/pricing/limits#connection). Ably charges for the number of published messages after rollup, not for the number of streamed tokens. ### Configure rollup behaviour Ably concatenates all appends for a single response that are received during the rollup window into one published message. You can specify the rollup window for a particular connection by setting the `appendRollupWindow` [transport parameter](/docs/api/realtime-sdk#client-options). This allows you to determine how much of the connection message rate can be consumed by a single response stream and control your consumption costs. +For example, if your server publishes appends at 100 tokens per second with a 40ms rollup window: -| `appendRollupWindow` | Maximum message rate for a single response | -|---|---| -| 0ms | Model output rate | -| 20ms | 50 messages/s | -| 40ms *(default)* | 25 messages/s | -| 100ms | 10 messages/s | -| 500ms *(max)* | 2 messages/s | +- 40ms window = 25 windows per second (1000ms / 40ms) +- Ably produces 25 rolled-up messages per second, each containing approximately 4 tokens +- The 25 msg/s rolled-up rate is what counts against the connection's inbound message rate limit + +The following table shows how different rollup windows affect the rate of messages received by subscribers and the number of concurrent token streams a single connection can support, assuming the model output is 50 tokens per second or greater: + +| `appendRollupWindow` | Subscriber delivery rate | Concurrent streams per connection | +|---|---|---| +| 20ms | 50 msg/s | 1 | +| 40ms *(default)* | 25 msg/s | 2 | +| 100ms | 10 msg/s | 5 | +| 500ms *(max)* | 2 msg/s | 25 | + +_Concurrent streams per connection is calculated based on the [connection inbound message rate limit](/docs/platform/pricing/limits#connection)._ + +A longer rollup window allows more concurrent streams per connection, but subscribers receive tokens in larger, less frequent batches. The default 40ms window provides smooth delivery for up to two concurrent streams. Increasing the window to 100ms or 200ms accommodates more streams with a modest reduction in delivery granularity. The following example code demonstrates establishing a connection to Ably with `appendRollupWindow` set to 100ms: