Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Add message pagination over a shared cursor core: new JSON-RPC `chat/history` method and optional `limit`/`before`/`after` window params on `chat/open`, plus opt-in pagination on remote HTTP `GET /api/v1/chats/:id`. Opaque cursors with a `lastCompaction` sentinel; meta exposes before/after/compaction cursors.

## 0.140.1

- MCP: recover remote servers with stale connections (e.g. after suspend): tool-call timeouts now probe and re-initialize; add `mcpKeepAliveSeconds` pings; 404 triggers re-init.
Expand Down
95 changes: 95 additions & 0 deletions docs/protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -1986,6 +1986,11 @@ client indicator stays in sync with the auto-approval behavior the server will
apply. Typically used after `chat/list` when the user selects a chat that has
not been opened in the current client session.

By default the full history is replayed. A client may instead pass the optional
`limit`/`before`/`after` window parameters (same cursors as `chat/history`) to
replay only the newest page and avoid streaming a large history; in that case
the response carries `meta` so the client can page older via `chat/history`.

_Request:_

* method: `chat/open`
Expand All @@ -1997,6 +2002,18 @@ interface ChatOpenParams {
* The chat session identifier to open.
*/
chatId: string;

/**
* Optional. Max messages to replay. When any of limit/before/after is
* present, only that window is replayed and `meta` is returned.
*/
limit?: number;

/** Optional opaque cursor — replay the page older than the cursor. */
before?: string;

/** Optional opaque cursor — replay the page newer than the cursor. */
after?: string;
}
```

Expand All @@ -2016,6 +2033,84 @@ interface ChatOpenResponse {

/** The chat title at the time of replay, when available. */
title?: string;

/** Pagination metadata, present only when window params were supplied. */
meta?: ChatHistoryMeta;

/** Present instead of replaying when a supplied cursor is stale. */
error?: { code: 'cursor_expired'; message: string };
}
```

### Chat history (↩️)

A client request to fetch a window of a persisted chat's history as a single
response (no streaming), so an editor can hydrate a bounded view and page older
on demand instead of receiving the full replay. The returned `contents` are the
same `ChatContent` items delivered by `chat/contentReceived`, so the client
reuses its existing renderer; live updates keep arriving over `chat/contentReceived`.

Pagination uses opaque cursors. The page is anchored to the newest messages in
the selected window; an opaque `after` cursor pages forward instead. The literal
sentinel `"lastCompaction"` may be passed as `before`/`after`: `after: "lastCompaction"`
returns the messages since the last compaction (the active context),
`before: "lastCompaction"` the summarized-away history. Cursors are ephemeral —
a cursor whose message no longer exists (e.g. after a rollback or clear) returns
a `cursor_expired` error, prompting the client to refetch the latest page.

_Request:_

* method: `chat/history`
* params: `ChatHistoryParams` defined as follows:

```typescript
interface ChatHistoryParams {
/** The chat session identifier to fetch. */
chatId: string;

/** Optional. Max messages to return in the page. */
limit?: number;

/** Optional opaque cursor (or "lastCompaction") — page older. */
before?: string;

/** Optional opaque cursor (or "lastCompaction") — page newer. */
after?: string;
}
```

_Response:_

```typescript
interface ChatHistoryResponse {
/**
* The page of history, as the same content items streamed by
* chat/contentReceived. Absent when `error` is present.
*/
contents?: ChatContentReceived[];

/** Pagination metadata. Absent when `error` is present. */
meta?: ChatHistoryMeta;

/** Present when the chat is unknown or a supplied cursor is stale. */
error?: { code: 'chat_not_found' | 'cursor_expired'; message: string };
}

interface ChatHistoryMeta {
/** Total messages in the full history. */
total: number;

/** Number of messages in this page. */
returned: number;

/** Cursor to pass as `before` to load older; null at the start. */
beforeCursor: string | null;

/** Cursor to pass as `after` to load newer; null at the tail. */
afterCursor: string | null;

/** Cursor at the last compaction boundary; null when never compacted. */
compactionCursor: string | null;
}
```

Expand Down
115 changes: 90 additions & 25 deletions src/eca/features/chat.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
[eca.config :as config]
[eca.db :as db]
[eca.features.background-tasks :as bg]
[eca.features.chat.history :as history]
[eca.features.chat.lifecycle :as lifecycle]
[eca.features.chat.tool-calls :as tc]
[eca.features.commands :as f.commands]
Expand Down Expand Up @@ -255,26 +256,43 @@
:text (:text message-content)
:contentId content-id}}]))

(defn messages->contents
"Pure transform of persisted `messages` into the flat sequence of chat-content
notification payloads ({:chat-id :role :content :parent-chat-id?}) that the
streaming replay sends, including nested subagent expansion. `db` is a plain
db snapshot used only to resolve subagent message histories.

Shared by the streaming replay (`send-chat-contents!`) and the request/response
`chat/history` method so both render identical content."
[messages {:keys [chat-id parent-chat-id db]}]
(let [->payload (fn [{:keys [role content]}]
(assoc-some {:chat-id chat-id :role role :content content}
:parent-chat-id parent-chat-id))]
(into []
(mapcat
(fn [message]
(let [chat-contents (message-content->chat-content (:role message) (:content message) (:content-id message))
subagent-chat-id (when (= "tool_call_output" (:role message))
(get-in message [:content :details :subagent-chat-id]))
subagent-messages (when subagent-chat-id
(get-in db [:chats subagent-chat-id :messages]))]
(if (some? subagent-messages)
;; For subagent tool calls: toolCallRun + toolCallRunning, then
;; subagent messages, then toolCalled — matching live execution order.
(concat (map ->payload (butlast chat-contents))
(messages->contents subagent-messages
{:chat-id subagent-chat-id
:parent-chat-id chat-id
:db db})
[(->payload (last chat-contents))])
(map ->payload chat-contents))))
messages))))

(defn ^:private send-chat-contents! [messages chat-ctx]
(doseq [message messages]
(let [chat-contents (message-content->chat-content (:role message) (:content message) (:content-id message))
subagent-chat-id (when (= "tool_call_output" (:role message))
(get-in message [:content :details :subagent-chat-id]))]
(if-let [subagent-messages (when subagent-chat-id
(get-in @(:db* chat-ctx) [:chats subagent-chat-id :messages]))]
;; For subagent tool calls: send toolCallRun + toolCallRunning, then
;; subagent messages, then toolCalled — matching live execution order.
(let [before-called (butlast chat-contents)
called (last chat-contents)]
(doseq [{:keys [role content]} before-called]
(lifecycle/send-content! chat-ctx role content))
(send-chat-contents! subagent-messages
(assoc chat-ctx
:chat-id subagent-chat-id
:parent-chat-id (:chat-id chat-ctx)))
(lifecycle/send-content! chat-ctx (:role called) (:content called)))
(doseq [{:keys [role content]} chat-contents]
(lifecycle/send-content! chat-ctx role content))))))
(doseq [payload (messages->contents messages {:chat-id (:chat-id chat-ctx)
:parent-chat-id (:parent-chat-id chat-ctx)
:db @(:db* chat-ctx)})]
(messenger/chat-content-received (:messenger chat-ctx) payload)))

(defn default-model [db config]
(llm-api/default-model db config))
Expand Down Expand Up @@ -1906,14 +1924,30 @@
selected model to the resumed chat's stored `:model` via a `config/updated`
notification, so an Opus-started chat keeps using Opus on the next prompt
(#417). Performs no DB mutation otherwise.

Optional `:limit`/`:before`/`:after` window the replay (same cursors as
`chat/history`); when provided, the response includes `:meta` so the client
can page older via `chat/history`. Without them the full history is replayed.
Returns `{:found? false}` when the chat does not exist or is a subagent,
otherwise `{:found? true :chat-id ... :title ...}`."
[{:keys [chat-id]} db* messenger config]
(let [chat (get-in @db* [:chats chat-id])]
(if (or (nil? chat) (:subagent chat))
otherwise `{:found? true :chat-id ... :title ... :meta? ...}`."
[{:keys [chat-id limit before after] :as params} db* messenger config]
(let [chat (get-in @db* [:chats chat-id])
windowed? (some #(contains? params %) [:limit :before :after])
page (when windowed?
(history/window-messages (vec (:messages chat))
{:limit limit :before before :after after}))]
(cond
(or (nil? chat) (:subagent chat))
{:found? false}

(= :cursor-expired (:error page))
{:found? true :chat-id chat-id
:error {:code "cursor_expired"
:message "Cursor no longer points to an existing message; refetch the latest page"}}

:else
(let [title (:title chat)
messages (:messages chat)
messages (if windowed? (:messages page) (:messages chat))
chat-ctx {:chat-id chat-id :db* db* :messenger messenger}]
(mark-editor-open! db* chat-id)
(messenger/chat-cleared messenger {:chat-id chat-id :messages true})
Expand All @@ -1922,4 +1956,35 @@
(lifecycle/send-content! chat-ctx :system (assoc-some {:type :metadata} :title title))
(config/notify-selected-model-changed! (:model chat) db* messenger config (:variant chat))
(config/notify-selected-trust-changed! (:trust chat) db* messenger)
{:found? true :chat-id chat-id :title title}))))
(assoc-some {:found? true :chat-id chat-id :title title}
:meta (when windowed?
{:total (:total page)
:returned (:returned page)
:before-cursor (:before-cursor page)
:after-cursor (:after-cursor page)
:compaction-cursor (history/compaction-cursor (:messages chat))}))))))

(defn fetch-history
"Window a chat's persisted messages and return the transformed content items
plus pagination meta, for the request/response `chat/history` method.

`before`/`after` are opaque cursors (or the `lastCompaction` sentinel) and
`limit` bounds the page. Returns {:contents [...] :meta {...}}, or
{:error {:code :message}} when the chat is unknown or a cursor is stale."
[{:keys [chat-id limit before after]} db*]
(let [db @db*
chat (get-in db [:chats chat-id])]
(if (or (nil? chat) (:subagent chat))
{:error {:code "chat_not_found" :message (str "Chat " chat-id " not found")}}
(let [messages (vec (:messages chat))
result (history/window-messages messages {:limit limit :before before :after after})]
(if (= :cursor-expired (:error result))
{:error {:code "cursor_expired"
:message "Cursor no longer points to an existing message; refetch the latest page"}}
{:contents (messages->contents (:messages result)
{:chat-id chat-id :parent-chat-id nil :db db})
:meta {:total (:total result)
:returned (:returned result)
:before-cursor (:before-cursor result)
:after-cursor (:after-cursor result)
:compaction-cursor (history/compaction-cursor messages)}})))))
122 changes: 122 additions & 0 deletions src/eca/features/chat/history.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
(ns eca.features.chat.history
"Transport-agnostic windowing and cursor logic over a chat's persisted
messages, shared by the remote HTTP API and the JSON-RPC `chat/history`
method.

Cursors are opaque tokens: base64url of \"<index>.<checksum>\" where checksum
fingerprints the message. Resolving trusts the encoded index when its checksum
still matches, else rescans by checksum (tolerating shifts like flag
insert/remove); a cursor whose message is gone resolves to :expired."
(:require
[clojure.string :as string])
(:import
[java.nio.charset StandardCharsets]
[java.util Base64]))

(set! *warn-on-reflection* true)

(def last-compaction-sentinel
"Literal cursor value resolving to the position of the last compaction marker."
"lastCompaction")

(defn ^:private message-checksum
"Stable-enough fingerprint of a message to validate/resolve a cursor."
[message]
(Integer/toHexString (hash [(:role message) (:created-at message) (:content message)])))

(defn ^:private encode-cursor [idx message]
(let [raw (.getBytes (str idx "." (message-checksum message)) StandardCharsets/UTF_8)]
(.encodeToString (.withoutPadding (Base64/getUrlEncoder)) raw)))

(defn ^:private decode-cursor [^String cursor]
(try
(let [decoded (String. (.decode (Base64/getUrlDecoder) cursor) StandardCharsets/UTF_8)
[idx-s checksum] (string/split decoded #"\." 2)]
{:idx (Integer/parseInt idx-s) :checksum checksum})
(catch Exception _ nil)))

(defn ^:private last-compaction-idx
"Index of the last compact_marker message, or nil when never compacted."
[messages]
(loop [i (dec (count messages))]
(cond
(neg? i) nil
(= "compact_marker" (:role (nth messages i))) i
:else (recur (dec i)))))

(defn ^:private resolve-cursor
"Resolves an opaque cursor to an index, or :expired if its message is gone."
[messages ^String cursor]
(if-let [{:keys [idx checksum]} (decode-cursor cursor)]
(if (and (<= 0 idx) (< idx (count messages))
(= checksum (message-checksum (nth messages idx))))
idx
(or (first (keep-indexed (fn [i m] (when (= checksum (message-checksum m)) i)) messages))
:expired))
:expired))

(defn ^:private resolve-bound
"Resolves a `before`/`after` value to an index, :expired, or nil (absent).
The `lastCompaction` sentinel resolves to the last compaction marker, or
:no-compaction when the chat was never compacted."
[messages value]
(when value
(if (= value last-compaction-sentinel)
(or (last-compaction-idx messages) :no-compaction)
(resolve-cursor messages value))))

(defn ^:private coerce-limit
"Coerces a limit that may arrive as an int (JSON-RPC) or string (HTTP query)."
[value]
(when value
(try
(let [n (if (integer? value) value (Integer/parseInt (str value)))]
(when (pos? n) n))
(catch Exception _ nil))))

(defn window-messages
"Windows `messages` by the (after, before) cursors and `limit`.

`after` is an exclusive lower bound, `before` an exclusive upper bound. The
page is the newest `limit` messages in the window; an opaque `after` cursor as
the only bound pages forward (oldest `limit`). `before-cursor`/`after-cursor`
are computed against full history so paging crosses the compaction boundary,
and are nil at the ends.

Returns {:messages :before-cursor :after-cursor :total :returned} or
{:error :cursor-expired}."
[messages {:keys [limit before after]}]
(let [messages (vec messages)
n (count messages)
limit (coerce-limit limit)
after-res (resolve-bound messages after)
before-res (resolve-bound messages before)]
(if (or (= :expired after-res) (= :expired before-res))
{:error :cursor-expired}
(let [lo (if (integer? after-res) after-res -1) ;; exclusive lower
hi (if (integer? before-res) before-res n) ;; exclusive upper
win-start (max 0 (min (inc lo) n))
win-end (max win-start (min hi n))
;; Opaque `after` cursor pages forward; the sentinel stays newest-anchored.
forward? (and (integer? after-res)
(not= after last-compaction-sentinel)
(nil? before-res))
[slice-start slice-end]
(cond
(nil? limit) [win-start win-end]
forward? [win-start (min win-end (+ win-start limit))]
:else [(max win-start (- win-end limit)) win-end])
slice (subvec messages slice-start slice-end)]
{:messages slice
:total n
:returned (count slice)
:before-cursor (when (pos? slice-start)
(encode-cursor slice-start (nth messages slice-start)))
:after-cursor (when (< slice-end n)
(encode-cursor (dec slice-end) (nth messages (dec slice-end))))}))))

(defn compaction-cursor
"Opaque cursor for the last compaction boundary, or nil when never compacted."
[messages]
(when-let [idx (last-compaction-idx (vec messages))]
(encode-cursor idx (nth (vec messages) idx))))
Loading
Loading