diff --git a/CHANGELOG.md b/CHANGELOG.md index 9324c4f9e..1111adf88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Add message pagination over a shared cursor core: new JSON-RPC `chat/history` method and optional `limit`/`before`/`after` window params on `chat/open`, plus opt-in pagination on remote HTTP `GET /api/v1/chats/:id`. Opaque cursors with a `lastCompaction` sentinel; meta exposes before/after/compaction cursors. + ## 0.140.1 - MCP: recover remote servers with stale connections (e.g. after suspend): tool-call timeouts now probe and re-initialize; add `mcpKeepAliveSeconds` pings; 404 triggers re-init. diff --git a/docs/protocol.md b/docs/protocol.md index 4ee727795..e3c2a6343 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -1986,6 +1986,11 @@ client indicator stays in sync with the auto-approval behavior the server will apply. Typically used after `chat/list` when the user selects a chat that has not been opened in the current client session. +By default the full history is replayed. A client may instead pass the optional +`limit`/`before`/`after` window parameters (same cursors as `chat/history`) to +replay only the newest page and avoid streaming a large history; in that case +the response carries `meta` so the client can page older via `chat/history`. + _Request:_ * method: `chat/open` @@ -1997,6 +2002,18 @@ interface ChatOpenParams { * The chat session identifier to open. */ chatId: string; + + /** + * Optional. Max messages to replay. When any of limit/before/after is + * present, only that window is replayed and `meta` is returned. + */ + limit?: number; + + /** Optional opaque cursor — replay the page older than the cursor. */ + before?: string; + + /** Optional opaque cursor — replay the page newer than the cursor. */ + after?: string; } ``` @@ -2016,6 +2033,84 @@ interface ChatOpenResponse { /** The chat title at the time of replay, when available. */ title?: string; + + /** Pagination metadata, present only when window params were supplied. */ + meta?: ChatHistoryMeta; + + /** Present instead of replaying when a supplied cursor is stale. */ + error?: { code: 'cursor_expired'; message: string }; +} +``` + +### Chat history (↩️) + +A client request to fetch a window of a persisted chat's history as a single +response (no streaming), so an editor can hydrate a bounded view and page older +on demand instead of receiving the full replay. The returned `contents` are the +same `ChatContent` items delivered by `chat/contentReceived`, so the client +reuses its existing renderer; live updates keep arriving over `chat/contentReceived`. + +Pagination uses opaque cursors. The page is anchored to the newest messages in +the selected window; an opaque `after` cursor pages forward instead. The literal +sentinel `"lastCompaction"` may be passed as `before`/`after`: `after: "lastCompaction"` +returns the messages since the last compaction (the active context), +`before: "lastCompaction"` the summarized-away history. Cursors are ephemeral — +a cursor whose message no longer exists (e.g. after a rollback or clear) returns +a `cursor_expired` error, prompting the client to refetch the latest page. + +_Request:_ + +* method: `chat/history` +* params: `ChatHistoryParams` defined as follows: + +```typescript +interface ChatHistoryParams { + /** The chat session identifier to fetch. */ + chatId: string; + + /** Optional. Max messages to return in the page. */ + limit?: number; + + /** Optional opaque cursor (or "lastCompaction") — page older. */ + before?: string; + + /** Optional opaque cursor (or "lastCompaction") — page newer. */ + after?: string; +} +``` + +_Response:_ + +```typescript +interface ChatHistoryResponse { + /** + * The page of history, as the same content items streamed by + * chat/contentReceived. Absent when `error` is present. + */ + contents?: ChatContentReceived[]; + + /** Pagination metadata. Absent when `error` is present. */ + meta?: ChatHistoryMeta; + + /** Present when the chat is unknown or a supplied cursor is stale. */ + error?: { code: 'chat_not_found' | 'cursor_expired'; message: string }; +} + +interface ChatHistoryMeta { + /** Total messages in the full history. */ + total: number; + + /** Number of messages in this page. */ + returned: number; + + /** Cursor to pass as `before` to load older; null at the start. */ + beforeCursor: string | null; + + /** Cursor to pass as `after` to load newer; null at the tail. */ + afterCursor: string | null; + + /** Cursor at the last compaction boundary; null when never compacted. */ + compactionCursor: string | null; } ``` diff --git a/src/eca/features/chat.clj b/src/eca/features/chat.clj index 9ca9fa71e..d2c0922fa 100644 --- a/src/eca/features/chat.clj +++ b/src/eca/features/chat.clj @@ -7,6 +7,7 @@ [eca.config :as config] [eca.db :as db] [eca.features.background-tasks :as bg] + [eca.features.chat.history :as history] [eca.features.chat.lifecycle :as lifecycle] [eca.features.chat.tool-calls :as tc] [eca.features.commands :as f.commands] @@ -255,26 +256,43 @@ :text (:text message-content) :contentId content-id}}])) +(defn messages->contents + "Pure transform of persisted `messages` into the flat sequence of chat-content + notification payloads ({:chat-id :role :content :parent-chat-id?}) that the + streaming replay sends, including nested subagent expansion. `db` is a plain + db snapshot used only to resolve subagent message histories. + + Shared by the streaming replay (`send-chat-contents!`) and the request/response + `chat/history` method so both render identical content." + [messages {:keys [chat-id parent-chat-id db]}] + (let [->payload (fn [{:keys [role content]}] + (assoc-some {:chat-id chat-id :role role :content content} + :parent-chat-id parent-chat-id))] + (into [] + (mapcat + (fn [message] + (let [chat-contents (message-content->chat-content (:role message) (:content message) (:content-id message)) + subagent-chat-id (when (= "tool_call_output" (:role message)) + (get-in message [:content :details :subagent-chat-id])) + subagent-messages (when subagent-chat-id + (get-in db [:chats subagent-chat-id :messages]))] + (if (some? subagent-messages) + ;; For subagent tool calls: toolCallRun + toolCallRunning, then + ;; subagent messages, then toolCalled — matching live execution order. + (concat (map ->payload (butlast chat-contents)) + (messages->contents subagent-messages + {:chat-id subagent-chat-id + :parent-chat-id chat-id + :db db}) + [(->payload (last chat-contents))]) + (map ->payload chat-contents)))) + messages)))) + (defn ^:private send-chat-contents! [messages chat-ctx] - (doseq [message messages] - (let [chat-contents (message-content->chat-content (:role message) (:content message) (:content-id message)) - subagent-chat-id (when (= "tool_call_output" (:role message)) - (get-in message [:content :details :subagent-chat-id]))] - (if-let [subagent-messages (when subagent-chat-id - (get-in @(:db* chat-ctx) [:chats subagent-chat-id :messages]))] - ;; For subagent tool calls: send toolCallRun + toolCallRunning, then - ;; subagent messages, then toolCalled — matching live execution order. - (let [before-called (butlast chat-contents) - called (last chat-contents)] - (doseq [{:keys [role content]} before-called] - (lifecycle/send-content! chat-ctx role content)) - (send-chat-contents! subagent-messages - (assoc chat-ctx - :chat-id subagent-chat-id - :parent-chat-id (:chat-id chat-ctx))) - (lifecycle/send-content! chat-ctx (:role called) (:content called))) - (doseq [{:keys [role content]} chat-contents] - (lifecycle/send-content! chat-ctx role content)))))) + (doseq [payload (messages->contents messages {:chat-id (:chat-id chat-ctx) + :parent-chat-id (:parent-chat-id chat-ctx) + :db @(:db* chat-ctx)})] + (messenger/chat-content-received (:messenger chat-ctx) payload))) (defn default-model [db config] (llm-api/default-model db config)) @@ -1906,14 +1924,30 @@ selected model to the resumed chat's stored `:model` via a `config/updated` notification, so an Opus-started chat keeps using Opus on the next prompt (#417). Performs no DB mutation otherwise. + + Optional `:limit`/`:before`/`:after` window the replay (same cursors as + `chat/history`); when provided, the response includes `:meta` so the client + can page older via `chat/history`. Without them the full history is replayed. Returns `{:found? false}` when the chat does not exist or is a subagent, - otherwise `{:found? true :chat-id ... :title ...}`." - [{:keys [chat-id]} db* messenger config] - (let [chat (get-in @db* [:chats chat-id])] - (if (or (nil? chat) (:subagent chat)) + otherwise `{:found? true :chat-id ... :title ... :meta? ...}`." + [{:keys [chat-id limit before after] :as params} db* messenger config] + (let [chat (get-in @db* [:chats chat-id]) + windowed? (some #(contains? params %) [:limit :before :after]) + page (when windowed? + (history/window-messages (vec (:messages chat)) + {:limit limit :before before :after after}))] + (cond + (or (nil? chat) (:subagent chat)) {:found? false} + + (= :cursor-expired (:error page)) + {:found? true :chat-id chat-id + :error {:code "cursor_expired" + :message "Cursor no longer points to an existing message; refetch the latest page"}} + + :else (let [title (:title chat) - messages (:messages chat) + messages (if windowed? (:messages page) (:messages chat)) chat-ctx {:chat-id chat-id :db* db* :messenger messenger}] (mark-editor-open! db* chat-id) (messenger/chat-cleared messenger {:chat-id chat-id :messages true}) @@ -1922,4 +1956,35 @@ (lifecycle/send-content! chat-ctx :system (assoc-some {:type :metadata} :title title)) (config/notify-selected-model-changed! (:model chat) db* messenger config (:variant chat)) (config/notify-selected-trust-changed! (:trust chat) db* messenger) - {:found? true :chat-id chat-id :title title})))) + (assoc-some {:found? true :chat-id chat-id :title title} + :meta (when windowed? + {:total (:total page) + :returned (:returned page) + :before-cursor (:before-cursor page) + :after-cursor (:after-cursor page) + :compaction-cursor (history/compaction-cursor (:messages chat))})))))) + +(defn fetch-history + "Window a chat's persisted messages and return the transformed content items + plus pagination meta, for the request/response `chat/history` method. + + `before`/`after` are opaque cursors (or the `lastCompaction` sentinel) and + `limit` bounds the page. Returns {:contents [...] :meta {...}}, or + {:error {:code :message}} when the chat is unknown or a cursor is stale." + [{:keys [chat-id limit before after]} db*] + (let [db @db* + chat (get-in db [:chats chat-id])] + (if (or (nil? chat) (:subagent chat)) + {:error {:code "chat_not_found" :message (str "Chat " chat-id " not found")}} + (let [messages (vec (:messages chat)) + result (history/window-messages messages {:limit limit :before before :after after})] + (if (= :cursor-expired (:error result)) + {:error {:code "cursor_expired" + :message "Cursor no longer points to an existing message; refetch the latest page"}} + {:contents (messages->contents (:messages result) + {:chat-id chat-id :parent-chat-id nil :db db}) + :meta {:total (:total result) + :returned (:returned result) + :before-cursor (:before-cursor result) + :after-cursor (:after-cursor result) + :compaction-cursor (history/compaction-cursor messages)}}))))) diff --git a/src/eca/features/chat/history.clj b/src/eca/features/chat/history.clj new file mode 100644 index 000000000..8d890f1af --- /dev/null +++ b/src/eca/features/chat/history.clj @@ -0,0 +1,122 @@ +(ns eca.features.chat.history + "Transport-agnostic windowing and cursor logic over a chat's persisted + messages, shared by the remote HTTP API and the JSON-RPC `chat/history` + method. + + Cursors are opaque tokens: base64url of \".\" where checksum + fingerprints the message. Resolving trusts the encoded index when its checksum + still matches, else rescans by checksum (tolerating shifts like flag + insert/remove); a cursor whose message is gone resolves to :expired." + (:require + [clojure.string :as string]) + (:import + [java.nio.charset StandardCharsets] + [java.util Base64])) + +(set! *warn-on-reflection* true) + +(def last-compaction-sentinel + "Literal cursor value resolving to the position of the last compaction marker." + "lastCompaction") + +(defn ^:private message-checksum + "Stable-enough fingerprint of a message to validate/resolve a cursor." + [message] + (Integer/toHexString (hash [(:role message) (:created-at message) (:content message)]))) + +(defn ^:private encode-cursor [idx message] + (let [raw (.getBytes (str idx "." (message-checksum message)) StandardCharsets/UTF_8)] + (.encodeToString (.withoutPadding (Base64/getUrlEncoder)) raw))) + +(defn ^:private decode-cursor [^String cursor] + (try + (let [decoded (String. (.decode (Base64/getUrlDecoder) cursor) StandardCharsets/UTF_8) + [idx-s checksum] (string/split decoded #"\." 2)] + {:idx (Integer/parseInt idx-s) :checksum checksum}) + (catch Exception _ nil))) + +(defn ^:private last-compaction-idx + "Index of the last compact_marker message, or nil when never compacted." + [messages] + (loop [i (dec (count messages))] + (cond + (neg? i) nil + (= "compact_marker" (:role (nth messages i))) i + :else (recur (dec i))))) + +(defn ^:private resolve-cursor + "Resolves an opaque cursor to an index, or :expired if its message is gone." + [messages ^String cursor] + (if-let [{:keys [idx checksum]} (decode-cursor cursor)] + (if (and (<= 0 idx) (< idx (count messages)) + (= checksum (message-checksum (nth messages idx)))) + idx + (or (first (keep-indexed (fn [i m] (when (= checksum (message-checksum m)) i)) messages)) + :expired)) + :expired)) + +(defn ^:private resolve-bound + "Resolves a `before`/`after` value to an index, :expired, or nil (absent). + The `lastCompaction` sentinel resolves to the last compaction marker, or + :no-compaction when the chat was never compacted." + [messages value] + (when value + (if (= value last-compaction-sentinel) + (or (last-compaction-idx messages) :no-compaction) + (resolve-cursor messages value)))) + +(defn ^:private coerce-limit + "Coerces a limit that may arrive as an int (JSON-RPC) or string (HTTP query)." + [value] + (when value + (try + (let [n (if (integer? value) value (Integer/parseInt (str value)))] + (when (pos? n) n)) + (catch Exception _ nil)))) + +(defn window-messages + "Windows `messages` by the (after, before) cursors and `limit`. + + `after` is an exclusive lower bound, `before` an exclusive upper bound. The + page is the newest `limit` messages in the window; an opaque `after` cursor as + the only bound pages forward (oldest `limit`). `before-cursor`/`after-cursor` + are computed against full history so paging crosses the compaction boundary, + and are nil at the ends. + + Returns {:messages :before-cursor :after-cursor :total :returned} or + {:error :cursor-expired}." + [messages {:keys [limit before after]}] + (let [messages (vec messages) + n (count messages) + limit (coerce-limit limit) + after-res (resolve-bound messages after) + before-res (resolve-bound messages before)] + (if (or (= :expired after-res) (= :expired before-res)) + {:error :cursor-expired} + (let [lo (if (integer? after-res) after-res -1) ;; exclusive lower + hi (if (integer? before-res) before-res n) ;; exclusive upper + win-start (max 0 (min (inc lo) n)) + win-end (max win-start (min hi n)) + ;; Opaque `after` cursor pages forward; the sentinel stays newest-anchored. + forward? (and (integer? after-res) + (not= after last-compaction-sentinel) + (nil? before-res)) + [slice-start slice-end] + (cond + (nil? limit) [win-start win-end] + forward? [win-start (min win-end (+ win-start limit))] + :else [(max win-start (- win-end limit)) win-end]) + slice (subvec messages slice-start slice-end)] + {:messages slice + :total n + :returned (count slice) + :before-cursor (when (pos? slice-start) + (encode-cursor slice-start (nth messages slice-start))) + :after-cursor (when (< slice-end n) + (encode-cursor (dec slice-end) (nth messages (dec slice-end))))})))) + +(defn compaction-cursor + "Opaque cursor for the last compaction boundary, or nil when never compacted." + [messages] + (when-let [idx (last-compaction-idx (vec messages))] + (encode-cursor idx (nth (vec messages) idx)))) diff --git a/src/eca/handlers.clj b/src/eca/handlers.clj index 594441e88..1e298535d 100644 --- a/src/eca/handlers.clj +++ b/src/eca/handlers.clj @@ -289,6 +289,15 @@ (metrics/task metrics :eca/chat-open (f.chat/open-chat! params db* messenger config))) +(defn chat-history + "Fetch a window of a chat's history (request/response, no streaming). + Supports optional :limit/:before/:after for pagination (see + `f.chat/fetch-history`). Returns `{:contents [...] :meta {...}}` or + `{:error {:code :message}}`." + [{:keys [db* metrics]} params] + (metrics/task metrics :eca/chat-history + (f.chat/fetch-history params db*))) + (defn mcp-stop-server [{:keys [db* messenger metrics config]} params] (metrics/task metrics :eca/mcp-stop-server (f.tools/stop-server! (:name params) db* messenger config metrics))) diff --git a/src/eca/remote/handlers.clj b/src/eca/remote/handlers.clj index fdc8e6e55..cfbf4e1c6 100644 --- a/src/eca/remote/handlers.clj +++ b/src/eca/remote/handlers.clj @@ -7,6 +7,7 @@ [clojure.core.async :as async] [eca.config :as config] [eca.features.chat :as f.chat] + [eca.features.chat.history :as f.chat.history] [eca.handlers :as handlers] [eca.remote.messenger :as remote.messenger] [eca.remote.sse :as sse] @@ -163,25 +164,47 @@ (mapv chat-summary))] (json-response chats))) -(defn handle-get-chat [{:keys [db*]} _request chat-id] +(defn handle-get-chat [{:keys [db*]} request chat-id] (if-let [chat (chat-or-404 db* chat-id)] (let [db @db* - config (config/all db)] - (json-response - (camel-keys - {:id (:id chat) - :title (:title chat) - :status (or (:status chat) :idle) - :created-at (:created-at chat) - :updated-at (:updated-at chat) - :messages (or (:messages chat) []) - :task (:task chat) - :pending-tool-calls (pending-tool-calls chat) - ;; Effective selection: per-chat override if set, otherwise the - ;; resolved session-level default. :variant may legitimately be nil. - :model (or (:model chat) (resolve-default-model db config)) - :variant (or (:variant chat) (resolve-default-variant db)) - :agent (or (:agent chat) (resolve-default-agent db config))}))) + config (config/all db) + params (:params request) + paginate? (some #(contains? params %) [:limit :before :after]) + all-messages (vec (or (:messages chat) [])) + page (when paginate? + (f.chat.history/window-messages all-messages + {:limit (:limit params) + :before (:before params) + :after (:after params)})) + base {:id (:id chat) + :title (:title chat) + :status (or (:status chat) :idle) + :created-at (:created-at chat) + :updated-at (:updated-at chat) + :messages (if paginate? (:messages page) all-messages) + :task (:task chat) + :pending-tool-calls (pending-tool-calls chat) + ;; Effective selection: per-chat override if set, otherwise the + ;; resolved session-level default. :variant may legitimately be nil. + :model (or (:model chat) (resolve-default-model db config)) + :variant (or (:variant chat) (resolve-default-variant db)) + :agent (or (:agent chat) (resolve-default-agent db config))}] + (cond + (= :cursor-expired (:error page)) + (error-response 409 "cursor_expired" + "Cursor no longer points to an existing message; refetch the latest page") + + paginate? + (json-response + (camel-keys + (assoc base :messages-meta {:total (:total page) + :returned (:returned page) + :before-cursor (:before-cursor page) + :after-cursor (:after-cursor page) + :compaction-cursor (f.chat.history/compaction-cursor all-messages)}))) + + :else + (json-response (camel-keys base)))) (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")))) (defn handle-prompt [{:keys [db*] :as components} request chat-id] diff --git a/src/eca/remote/routes.clj b/src/eca/remote/routes.clj index b5bcdbc88..3ee9a6ea9 100644 --- a/src/eca/remote/routes.clj +++ b/src/eca/remote/routes.clj @@ -5,7 +5,9 @@ [clojure.string :as string] [eca.remote.auth :as auth] [eca.remote.handlers :as handlers] - [eca.remote.middleware :as middleware])) + [eca.remote.middleware :as middleware] + [ring.middleware.keyword-params :refer [wrap-keyword-params]] + [ring.middleware.params :refer [wrap-params]])) (set! *warn-on-reflection* true) @@ -130,5 +132,7 @@ {:error {:code "internal_error" :message (or (.getMessage e) "Unknown error")}})}))) (not-found-response))) + (wrap-keyword-params) + (wrap-params) (auth/wrap-bearer-auth token ["/" "/api/v1/health"]) (middleware/wrap-cors)))) diff --git a/src/eca/server.clj b/src/eca/server.clj index c438bfc78..f9a54acca 100644 --- a/src/eca/server.clj +++ b/src/eca/server.clj @@ -187,6 +187,9 @@ (defmethod jsonrpc.server/receive-request "chat/open" [_ components params] (eventually (handlers/chat-open (with-config components) params))) +(defmethod jsonrpc.server/receive-request "chat/history" [_ components params] + (eventually (handlers/chat-history (with-config components) params))) + (defmethod jsonrpc.server/receive-notification "mcp/stopServer" [_ components params] (async-notify (handlers/mcp-stop-server (with-config components) params))) diff --git a/test/eca/features/chat/history_test.clj b/test/eca/features/chat/history_test.clj new file mode 100644 index 000000000..3a4dc5fe4 --- /dev/null +++ b/test/eca/features/chat/history_test.clj @@ -0,0 +1,80 @@ +(ns eca.features.chat.history-test + (:require + [clojure.test :refer [deftest is testing]] + [eca.features.chat.history :as f.chat.history])) + +(defn- msg [role text created-at] + {:role role :content [{:type :text :text text}] :created-at created-at}) + +(def ^:private messages + [(msg "user" "m0" 1) + (msg "assistant" "m1" 2) + (msg "user" "m2" 3) + (msg "assistant" "m3" 4) + (msg "user" "m4" 5)]) + +(defn- texts [result] + (mapv #(get-in % [:content 0 :text]) (:messages result))) + +(deftest window-messages-test + (testing "limit returns the newest N messages" + (let [r (f.chat.history/window-messages messages {:limit 2})] + (is (= ["m3" "m4"] (texts r))) + (is (= 5 (:total r))) + (is (= 2 (:returned r))) + (is (some? (:before-cursor r))) + (is (nil? (:after-cursor r))))) + + (testing "limit accepts string (HTTP) and int (JSON-RPC)" + (is (= ["m3" "m4"] (texts (f.chat.history/window-messages messages {:limit "2"})))) + (is (= ["m3" "m4"] (texts (f.chat.history/window-messages messages {:limit 2}))))) + + (testing "no limit returns the whole window" + (let [r (f.chat.history/window-messages messages {})] + (is (= ["m0" "m1" "m2" "m3" "m4"] (texts r))) + (is (nil? (:before-cursor r))) + (is (nil? (:after-cursor r))))) + + (testing "before cursor loads the older page" + (let [before (:before-cursor (f.chat.history/window-messages messages {:limit 2})) + r (f.chat.history/window-messages messages {:limit 2 :before before})] + (is (= ["m1" "m2"] (texts r))) + (is (some? (:before-cursor r))) + (is (some? (:after-cursor r))))) + + (testing "opaque after cursor pages forward" + (let [first-cursor (:before-cursor + (f.chat.history/window-messages messages {:limit 4})) + ;; first-cursor points at m1 (index 1); forward page after it + r (f.chat.history/window-messages messages {:limit 2 :after first-cursor})] + (is (= ["m2" "m3"] (texts r))))) + + (testing "expired cursor returns error" + (let [r (f.chat.history/window-messages messages {:before "not-a-real-cursor"})] + (is (= :cursor-expired (:error r)))))) + +(deftest compaction-window-test + (let [with-marker [(msg "user" "m0" 1) + (msg "assistant" "m1" 2) + {:role "compact_marker" :content {:auto? false} :created-at 3} + (msg "user" "summary" 4) + (msg "assistant" "m4" 5)]] + (testing "after=lastCompaction returns post-compaction messages, newest-anchored" + (let [r (f.chat.history/window-messages with-marker {:after f.chat.history/last-compaction-sentinel})] + (is (= ["summary" "m4"] (texts r))) + (is (nil? (:after-cursor r))) + (is (some? (:before-cursor r)))) + (let [r (f.chat.history/window-messages with-marker {:after f.chat.history/last-compaction-sentinel :limit 1})] + (is (= ["m4"] (texts r))))) + + (testing "before=lastCompaction returns the summarized-away history" + (let [r (f.chat.history/window-messages with-marker {:before f.chat.history/last-compaction-sentinel})] + (is (= ["m0" "m1"] (texts r))))) + + (testing "compaction-cursor is present only when compacted" + (is (some? (f.chat.history/compaction-cursor with-marker))) + (is (nil? (f.chat.history/compaction-cursor messages)))) + + (testing "lastCompaction with no marker falls back to the full window" + (let [r (f.chat.history/window-messages messages {:after f.chat.history/last-compaction-sentinel})] + (is (= ["m0" "m1" "m2" "m3" "m4"] (texts r))))))) diff --git a/test/eca/features/chat_test.clj b/test/eca/features/chat_test.clj index 75a7376b0..f589d617a 100644 --- a/test/eca/features/chat_test.clj +++ b/test/eca/features/chat_test.clj @@ -1508,6 +1508,87 @@ (is (match? {:config-updated [{:chat {:select-trust false}}]} (h/messages)))))) +(deftest fetch-history-test + (let [text-msg (fn [text n] {:role "user" :content [{:type :text :text text}] :created-at n}) + seed! (fn [chat-id msgs] + (h/reset-components!) + (swap! (h/db*) assoc-in [:chats chat-id] {:id chat-id :messages msgs}) + chat-id) + content-texts (fn [{:keys [contents]}] + (keep #(some-> (get-in % [:content :text]) string/trim) contents))] + (testing "unknown chat returns an error" + (h/reset-components!) + (is (= "chat_not_found" (get-in (f.chat/fetch-history {:chat-id "nope"} (h/db*)) [:error :code])))) + + (testing "subagent chat is not fetchable" + (h/reset-components!) + (swap! (h/db*) assoc-in [:chats "sub"] {:id "sub" :subagent true :messages []}) + (is (= "chat_not_found" (get-in (f.chat/fetch-history {:chat-id "sub"} (h/db*)) [:error :code])))) + + (testing "returns transformed content items plus meta" + (let [chat-id (seed! "c1" [(text-msg "m0" 1) (text-msg "m1" 2) (text-msg "m2" 3)]) + result (f.chat/fetch-history {:chat-id chat-id :limit 2} (h/db*))] + (is (= ["m1" "m2"] (content-texts result))) + (is (every? #(= chat-id (:chat-id %)) (:contents result))) + (is (= 3 (get-in result [:meta :total]))) + (is (= 2 (get-in result [:meta :returned]))) + (is (some? (get-in result [:meta :before-cursor]))) + (is (nil? (get-in result [:meta :after-cursor]))))) + + (testing "before cursor pages older" + (let [chat-id (seed! "c1" [(text-msg "m0" 1) (text-msg "m1" 2) (text-msg "m2" 3)]) + before (get-in (f.chat/fetch-history {:chat-id chat-id :limit 2} (h/db*)) [:meta :before-cursor]) + result (f.chat/fetch-history {:chat-id chat-id :limit 2 :before before} (h/db*))] + ;; first page was [m1 m2]; older-than-m1 is just [m0] + (is (= ["m0"] (content-texts result))))) + + (testing "after=lastCompaction returns the active context with a compaction cursor" + (let [chat-id (seed! "c1" [(text-msg "m0" 1) + {:role "compact_marker" :content {:auto? false} :created-at 2} + (text-msg "summary" 3) + (text-msg "m3" 4)]) + result (f.chat/fetch-history {:chat-id chat-id :after "lastCompaction"} (h/db*))] + (is (= ["summary" "m3"] (content-texts result))) + (is (some? (get-in result [:meta :compaction-cursor]))))) + + (testing "stale cursor returns cursor_expired" + (let [chat-id (seed! "c1" [(text-msg "m0" 1)])] + (is (= "cursor_expired" + (get-in (f.chat/fetch-history {:chat-id chat-id :before "bogus"} (h/db*)) [:error :code]))))))) + +(deftest open-chat-windowed-test + (let [msg (fn [t n] {:role "user" :content [{:type :text :text t}] :created-at n}) + replayed-texts (fn [] + (->> (h/messages) :chat-content-received + (keep #(some-> (get-in % [:content :text]) string/trim)) + vec))] + (testing "no window params replays the full history with no meta" + (h/reset-components!) + (swap! (h/db*) assoc-in [:chats "full"] + {:id "full" :messages [(msg "m0" 1) (msg "m1" 2) (msg "m2" 3)]}) + (let [result (f.chat/open-chat! {:chat-id "full"} (h/db*) (h/messenger) (h/config))] + (is (nil? (:meta result))) + (is (= ["m0" "m1" "m2"] (replayed-texts))))) + + (testing "limit replays only the window and returns meta" + (h/reset-components!) + (swap! (h/db*) assoc-in [:chats "win"] + {:id "win" :messages [(msg "m0" 1) (msg "m1" 2) (msg "m2" 3)]}) + (let [result (f.chat/open-chat! {:chat-id "win" :limit 2} (h/db*) (h/messenger) (h/config))] + (is (match? {:found? true :chat-id "win" + :meta {:total 3 :returned 2 :after-cursor nil}} + result)) + (is (some? (get-in result [:meta :before-cursor]))) + (is (= ["m1" "m2"] (replayed-texts))))) + + (testing "expired cursor errors before any replay" + (h/reset-components!) + (swap! (h/db*) assoc-in [:chats "exp"] + {:id "exp" :messages [(msg "m0" 1)]}) + (let [result (f.chat/open-chat! {:chat-id "exp" :before "bogus"} (h/db*) (h/messenger) (h/config))] + (is (= "cursor_expired" (get-in result [:error :code]))) + (is (= {} (h/messages)) "no notifications emitted on expired cursor"))))) + (deftest prompt-cache-agent-and-model-switch-test (testing "local static prompt cache is reused only when both agent and model match" (h/reset-components!) diff --git a/test/eca/remote/handlers_test.clj b/test/eca/remote/handlers_test.clj index dbbf2cce5..7077ddc59 100644 --- a/test/eca/remote/handlers_test.clj +++ b/test/eca/remote/handlers_test.clj @@ -3,6 +3,7 @@ [cheshire.core :as json] [clojure.test :refer [deftest is testing]] [eca.config :as config] + [eca.features.chat.history :as f.chat.history] [eca.messenger :as messenger] [eca.remote.handlers :as handlers] [eca.remote.messenger :as remote.messenger] @@ -218,10 +219,112 @@ body (json/parse-string (:body response) true)] (is (= "chat-model" (:model body))) (is (= "chat-agent" (:agent body))) - ;; variant has no per-chat override, so it falls back to the session value - (is (= "session-variant" (:variant body)))))) - -(deftest handle-stop-test + ;; variant has no per-chat override, so it falls back to the session value + (is (= "session-variant" (:variant body)))))) + + (defn- msg [role text created-at] + {:role role :content [{:type :text :text text}] :created-at created-at}) + + (defn- texts [body] + (mapv #(get-in % [:content 0 :text]) (:messages body))) + + (defn- get-chat [params] + (let [response (handlers/handle-get-chat (components) {:params params} "c1")] + (assoc response :parsed (json/parse-string (:body response) true)))) + + (deftest handle-get-chat-pagination-test + (let [messages [(msg "user" "m0" 1) + (msg "assistant" "m1" 2) + (msg "user" "m2" 3) + (msg "assistant" "m3" 4) + (msg "user" "m4" 5)]] + + (testing "no query params returns the legacy shape: full messages, no messagesMeta" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [{:keys [parsed]} (get-chat {})] + (is (= ["m0" "m1" "m2" "m3" "m4"] (texts parsed))) + (is (not (contains? parsed :messagesMeta))))) + + (testing "limit returns the newest N messages with meta and cursors" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [{:keys [parsed]} (get-chat {:limit "2"}) + meta (:messagesMeta parsed)] + (is (= ["m3" "m4"] (texts parsed))) + (is (= 5 (:total meta))) + (is (= 2 (:returned meta))) + (is (some? (:beforeCursor meta)) "older messages exist -> beforeCursor present") + (is (nil? (:afterCursor meta)) "at the tail -> afterCursor nil") + (is (nil? (:compactionCursor meta)) "never compacted -> compactionCursor nil"))) + + (testing "before cursor loads the previous (older) page" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [before-cursor (get-in (get-chat {:limit "2"}) [:parsed :messagesMeta :beforeCursor]) + {:keys [parsed]} (get-chat {:limit "2" :before before-cursor}) + meta (:messagesMeta parsed)] + (is (= ["m1" "m2"] (texts parsed))) + (is (some? (:beforeCursor meta)) "more older messages still exist") + (is (some? (:afterCursor meta)) "newer messages exist -> afterCursor present"))) + + (testing "opaque after cursor forward-pages the messages right after it" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [after-cursor (#'f.chat.history/encode-cursor 0 (nth messages 0)) + {:keys [parsed]} (get-chat {:limit "2" :after after-cursor})] + (is (= ["m1" "m2"] (texts parsed))))) + + (testing "nil-punning: beforeCursor nil at the start of history" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [meta (get-in (get-chat {:limit "10"}) [:parsed :messagesMeta])] + (is (= ["m0" "m1" "m2" "m3" "m4"] (texts (:parsed (get-chat {:limit "10"}))))) + (is (nil? (:beforeCursor meta))) + (is (nil? (:afterCursor meta))))) + + (testing "expired cursor returns 409" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [bogus (#'f.chat.history/encode-cursor 99 {:role "x" :content "gone" :created-at 999999}) + response (handlers/handle-get-chat (components) {:params {:before bogus}} "c1") + body (json/parse-string (:body response) true)] + (is (= 409 (:status response))) + (is (= "cursor_expired" (get-in body [:error :code]))))))) + + (deftest handle-get-chat-compaction-window-test + ;; indices: 0 user, 1 assistant, 2 compact_marker, 3 user(summary), 4 assistant + (let [messages [(msg "user" "m0" 1) + (msg "assistant" "m1" 2) + {:role "compact_marker" :content {:auto? false} :created-at 3} + (msg "user" "summary" 4) + (msg "assistant" "m4" 5)]] + + (testing "after=lastCompaction returns only post-compaction messages, newest-anchored" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [{:keys [parsed]} (get-chat {:after "lastCompaction"}) + meta (:messagesMeta parsed)] + (is (= ["summary" "m4"] (texts parsed)) "pre-compaction messages excluded") + (is (nil? (:afterCursor meta)) "at the tail") + (is (some? (:beforeCursor meta)) "can still page across the compaction boundary") + (is (some? (:compactionCursor meta))))) + + (testing "after=lastCompaction honors limit" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [{:keys [parsed]} (get-chat {:after "lastCompaction" :limit "1"})] + (is (= ["m4"] (texts parsed))))) + + (testing "before=lastCompaction returns the summarized-away history" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [{:keys [parsed]} (get-chat {:before "lastCompaction"}) + meta (:messagesMeta parsed)] + (is (= ["m0" "m1"] (texts parsed))) + (is (nil? (:beforeCursor meta))) + (is (some? (:afterCursor meta))))) + + (testing "after=lastCompaction with no compaction marker falls back to full window" + (swap! (h/db*) assoc-in [:chats "c1"] + {:id "c1" :title "T" :status :idle + :messages [(msg "user" "a" 1) (msg "assistant" "b" 2)]}) + (let [{:keys [parsed]} (get-chat {:after "lastCompaction"})] + (is (= ["a" "b"] (texts parsed))) + (is (nil? (get-in parsed [:messagesMeta :compactionCursor]))))))) + + (deftest handle-stop-test (testing "returns 404 for missing chat" (let [response (handlers/handle-stop (components) nil "nonexistent")] (is (= 404 (:status response)))))