From 90c62fe5c52d002624cedc9c3a75f73301a854af Mon Sep 17 00:00:00 2001 From: Ryan Schmukler Date: Fri, 12 Jun 2026 12:10:53 -0400 Subject: [PATCH 1/2] Add opt-in message pagination to remote chat HTTP API GET /api/v1/chats/:id accepts limit/before/after query params with opaque cursors and an after=lastCompaction sentinel, returning a messagesMeta object with before/after/compaction cursors. --- CHANGELOG.md | 2 + src/eca/remote/handlers.clj | 169 ++++++++++++++++++++++++++---- src/eca/remote/routes.clj | 6 +- test/eca/remote/handlers_test.clj | 110 ++++++++++++++++++- 4 files changed, 264 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 842958a58..d5cb9ab2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Remote HTTP `GET /api/v1/chats/:id`: opt-in message pagination via `limit`/`before`/`after` query params (opaque cursors, `after=lastCompaction` sentinel) returning `messagesMeta` with before/after/compaction cursors. + ## 0.140.1 - MCP: recover remote servers with stale connections (e.g. after suspend): tool-call timeouts now probe and re-initialize; add `mcpKeepAliveSeconds` pings; 404 triggers re-init. diff --git a/src/eca/remote/handlers.clj b/src/eca/remote/handlers.clj index fdc8e6e55..2927a7135 100644 --- a/src/eca/remote/handlers.clj +++ b/src/eca/remote/handlers.clj @@ -5,6 +5,7 @@ (:require [cheshire.core :as json] [clojure.core.async :as async] + [clojure.string :as string] [eca.config :as config] [eca.features.chat :as f.chat] [eca.handlers :as handlers] @@ -14,7 +15,9 @@ [ring.core.protocols :as ring.protocols]) (:import [java.io InputStream] - [java.time Instant])) + [java.nio.charset StandardCharsets] + [java.time Instant] + [java.util Base64])) (set! *warn-on-reflection* true) @@ -163,25 +166,155 @@ (mapv chat-summary))] (json-response chats))) -(defn handle-get-chat [{:keys [db*]} _request chat-id] +;; --- Message pagination (opt-in via `limit`/`before`/`after` query params) ----- +;; +;; Cursors are opaque tokens: base64url of "." where checksum +;; fingerprints the message. Resolving trusts the index when its checksum still +;; matches, else rescans by checksum (tolerating shifts like flag insert/remove); +;; a cursor whose message is gone resolves to :expired (-> 409). + +(def ^:private last-compaction-sentinel + "Literal cursor value resolving to the position of the last compaction marker." + "lastCompaction") + +(defn ^:private message-checksum + "Stable-enough fingerprint of a message to validate/resolve a cursor." + [message] + (Integer/toHexString (hash [(:role message) (:created-at message) (:content message)]))) + +(defn ^:private encode-cursor [idx message] + (let [raw (.getBytes (str idx "." (message-checksum message)) StandardCharsets/UTF_8)] + (.encodeToString (.withoutPadding (Base64/getUrlEncoder)) raw))) + +(defn ^:private decode-cursor [^String cursor] + (try + (let [decoded (String. (.decode (Base64/getUrlDecoder) cursor) StandardCharsets/UTF_8) + [idx-s checksum] (string/split decoded #"\." 2)] + {:idx (Integer/parseInt idx-s) :checksum checksum}) + (catch Exception _ nil))) + +(defn ^:private last-compaction-idx + "Index of the last compact_marker message, or nil when never compacted." + [messages] + (loop [i (dec (count messages))] + (cond + (neg? i) nil + (= "compact_marker" (:role (nth messages i))) i + :else (recur (dec i))))) + +(defn ^:private resolve-cursor + "Resolves an opaque cursor to an index, or :expired if its message is gone." + [messages ^String cursor] + (if-let [{:keys [idx checksum]} (decode-cursor cursor)] + (if (and (<= 0 idx) (< idx (count messages)) + (= checksum (message-checksum (nth messages idx)))) + idx + (or (first (keep-indexed (fn [i m] (when (= checksum (message-checksum m)) i)) messages)) + :expired)) + :expired)) + +(defn ^:private resolve-bound + "Resolves a `before`/`after` query value to an index, :expired, or nil (absent). + The `lastCompaction` sentinel resolves to the last compaction marker, or + :no-compaction when the chat was never compacted." + [messages value] + (when value + (if (= value last-compaction-sentinel) + (or (last-compaction-idx messages) :no-compaction) + (resolve-cursor messages value)))) + +(defn ^:private parse-limit [value] + (when value + (try + (let [n (Integer/parseInt (str value))] + (when (pos? n) n)) + (catch Exception _ nil)))) + +(defn ^:private paginate-messages + "Windows `messages` by the (after, before) cursors and `limit`. + + `after` is an exclusive lower bound, `before` an exclusive upper bound. The + page is the newest `limit` messages in the window; an opaque `after` cursor as + the only bound pages forward (oldest `limit`). `before-cursor`/`after-cursor` + are computed against full history so paging crosses the compaction boundary, + and are nil at the ends. + + Returns {:messages :before-cursor :after-cursor :total :returned} or + {:error :cursor-expired}." + [messages {:keys [limit before after]}] + (let [messages (vec messages) + n (count messages) + after-res (resolve-bound messages after) + before-res (resolve-bound messages before)] + (if (or (= :expired after-res) (= :expired before-res)) + {:error :cursor-expired} + (let [lo (if (integer? after-res) after-res -1) ;; exclusive lower + hi (if (integer? before-res) before-res n) ;; exclusive upper + win-start (max 0 (min (inc lo) n)) + win-end (max win-start (min hi n)) + ;; Opaque `after` cursor pages forward; the sentinel stays newest-anchored. + forward? (and (integer? after-res) + (not= after last-compaction-sentinel) + (nil? before-res)) + [slice-start slice-end] + (cond + (nil? limit) [win-start win-end] + forward? [win-start (min win-end (+ win-start limit))] + :else [(max win-start (- win-end limit)) win-end]) + slice (subvec messages slice-start slice-end)] + {:messages slice + :total n + :returned (count slice) + :before-cursor (when (pos? slice-start) + (encode-cursor slice-start (nth messages slice-start))) + :after-cursor (when (< slice-end n) + (encode-cursor (dec slice-end) (nth messages (dec slice-end))))})))) + +(defn ^:private compaction-cursor [messages] + (when-let [idx (last-compaction-idx messages)] + (encode-cursor idx (nth messages idx)))) + +(defn handle-get-chat [{:keys [db*]} request chat-id] (if-let [chat (chat-or-404 db* chat-id)] (let [db @db* - config (config/all db)] - (json-response - (camel-keys - {:id (:id chat) - :title (:title chat) - :status (or (:status chat) :idle) - :created-at (:created-at chat) - :updated-at (:updated-at chat) - :messages (or (:messages chat) []) - :task (:task chat) - :pending-tool-calls (pending-tool-calls chat) - ;; Effective selection: per-chat override if set, otherwise the - ;; resolved session-level default. :variant may legitimately be nil. - :model (or (:model chat) (resolve-default-model db config)) - :variant (or (:variant chat) (resolve-default-variant db)) - :agent (or (:agent chat) (resolve-default-agent db config))}))) + config (config/all db) + params (:params request) + paginate? (some #(contains? params %) [:limit :before :after]) + all-messages (vec (or (:messages chat) [])) + page (when paginate? + (paginate-messages all-messages + {:limit (parse-limit (:limit params)) + :before (:before params) + :after (:after params)})) + base {:id (:id chat) + :title (:title chat) + :status (or (:status chat) :idle) + :created-at (:created-at chat) + :updated-at (:updated-at chat) + :messages (if paginate? (:messages page) all-messages) + :task (:task chat) + :pending-tool-calls (pending-tool-calls chat) + ;; Effective selection: per-chat override if set, otherwise the + ;; resolved session-level default. :variant may legitimately be nil. + :model (or (:model chat) (resolve-default-model db config)) + :variant (or (:variant chat) (resolve-default-variant db)) + :agent (or (:agent chat) (resolve-default-agent db config))}] + (cond + (= :cursor-expired (:error page)) + (error-response 409 "cursor_expired" + "Cursor no longer points to an existing message; refetch the latest page") + + paginate? + (json-response + (camel-keys + (assoc base :messages-meta {:total (:total page) + :returned (:returned page) + :before-cursor (:before-cursor page) + :after-cursor (:after-cursor page) + :compaction-cursor (compaction-cursor all-messages)}))) + + :else + (json-response (camel-keys base)))) (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")))) (defn handle-prompt [{:keys [db*] :as components} request chat-id] diff --git a/src/eca/remote/routes.clj b/src/eca/remote/routes.clj index b5bcdbc88..3ee9a6ea9 100644 --- a/src/eca/remote/routes.clj +++ b/src/eca/remote/routes.clj @@ -5,7 +5,9 @@ [clojure.string :as string] [eca.remote.auth :as auth] [eca.remote.handlers :as handlers] - [eca.remote.middleware :as middleware])) + [eca.remote.middleware :as middleware] + [ring.middleware.keyword-params :refer [wrap-keyword-params]] + [ring.middleware.params :refer [wrap-params]])) (set! *warn-on-reflection* true) @@ -130,5 +132,7 @@ {:error {:code "internal_error" :message (or (.getMessage e) "Unknown error")}})}))) (not-found-response))) + (wrap-keyword-params) + (wrap-params) (auth/wrap-bearer-auth token ["/" "/api/v1/health"]) (middleware/wrap-cors)))) diff --git a/test/eca/remote/handlers_test.clj b/test/eca/remote/handlers_test.clj index dbbf2cce5..5106ee784 100644 --- a/test/eca/remote/handlers_test.clj +++ b/test/eca/remote/handlers_test.clj @@ -218,10 +218,112 @@ body (json/parse-string (:body response) true)] (is (= "chat-model" (:model body))) (is (= "chat-agent" (:agent body))) - ;; variant has no per-chat override, so it falls back to the session value - (is (= "session-variant" (:variant body)))))) - -(deftest handle-stop-test + ;; variant has no per-chat override, so it falls back to the session value + (is (= "session-variant" (:variant body)))))) + + (defn- msg [role text created-at] + {:role role :content [{:type :text :text text}] :created-at created-at}) + + (defn- texts [body] + (mapv #(get-in % [:content 0 :text]) (:messages body))) + + (defn- get-chat [params] + (let [response (handlers/handle-get-chat (components) {:params params} "c1")] + (assoc response :parsed (json/parse-string (:body response) true)))) + + (deftest handle-get-chat-pagination-test + (let [messages [(msg "user" "m0" 1) + (msg "assistant" "m1" 2) + (msg "user" "m2" 3) + (msg "assistant" "m3" 4) + (msg "user" "m4" 5)]] + + (testing "no query params returns the legacy shape: full messages, no messagesMeta" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [{:keys [parsed]} (get-chat {})] + (is (= ["m0" "m1" "m2" "m3" "m4"] (texts parsed))) + (is (not (contains? parsed :messagesMeta))))) + + (testing "limit returns the newest N messages with meta and cursors" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [{:keys [parsed]} (get-chat {:limit "2"}) + meta (:messagesMeta parsed)] + (is (= ["m3" "m4"] (texts parsed))) + (is (= 5 (:total meta))) + (is (= 2 (:returned meta))) + (is (some? (:beforeCursor meta)) "older messages exist -> beforeCursor present") + (is (nil? (:afterCursor meta)) "at the tail -> afterCursor nil") + (is (nil? (:compactionCursor meta)) "never compacted -> compactionCursor nil"))) + + (testing "before cursor loads the previous (older) page" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [before-cursor (get-in (get-chat {:limit "2"}) [:parsed :messagesMeta :beforeCursor]) + {:keys [parsed]} (get-chat {:limit "2" :before before-cursor}) + meta (:messagesMeta parsed)] + (is (= ["m1" "m2"] (texts parsed))) + (is (some? (:beforeCursor meta)) "more older messages still exist") + (is (some? (:afterCursor meta)) "newer messages exist -> afterCursor present"))) + + (testing "opaque after cursor forward-pages the messages right after it" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [after-cursor (#'handlers/encode-cursor 0 (nth messages 0)) + {:keys [parsed]} (get-chat {:limit "2" :after after-cursor})] + (is (= ["m1" "m2"] (texts parsed))))) + + (testing "nil-punning: beforeCursor nil at the start of history" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [meta (get-in (get-chat {:limit "10"}) [:parsed :messagesMeta])] + (is (= ["m0" "m1" "m2" "m3" "m4"] (texts (:parsed (get-chat {:limit "10"}))))) + (is (nil? (:beforeCursor meta))) + (is (nil? (:afterCursor meta))))) + + (testing "expired cursor returns 409" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [bogus (#'handlers/encode-cursor 99 {:role "x" :content "gone" :created-at 999999}) + response (handlers/handle-get-chat (components) {:params {:before bogus}} "c1") + body (json/parse-string (:body response) true)] + (is (= 409 (:status response))) + (is (= "cursor_expired" (get-in body [:error :code]))))))) + + (deftest handle-get-chat-compaction-window-test + ;; indices: 0 user, 1 assistant, 2 compact_marker, 3 user(summary), 4 assistant + (let [messages [(msg "user" "m0" 1) + (msg "assistant" "m1" 2) + {:role "compact_marker" :content {:auto? false} :created-at 3} + (msg "user" "summary" 4) + (msg "assistant" "m4" 5)]] + + (testing "after=lastCompaction returns only post-compaction messages, newest-anchored" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [{:keys [parsed]} (get-chat {:after "lastCompaction"}) + meta (:messagesMeta parsed)] + (is (= ["summary" "m4"] (texts parsed)) "pre-compaction messages excluded") + (is (nil? (:afterCursor meta)) "at the tail") + (is (some? (:beforeCursor meta)) "can still page across the compaction boundary") + (is (some? (:compactionCursor meta))))) + + (testing "after=lastCompaction honors limit" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [{:keys [parsed]} (get-chat {:after "lastCompaction" :limit "1"})] + (is (= ["m4"] (texts parsed))))) + + (testing "before=lastCompaction returns the summarized-away history" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) + (let [{:keys [parsed]} (get-chat {:before "lastCompaction"}) + meta (:messagesMeta parsed)] + (is (= ["m0" "m1"] (texts parsed))) + (is (nil? (:beforeCursor meta))) + (is (some? (:afterCursor meta))))) + + (testing "after=lastCompaction with no compaction marker falls back to full window" + (swap! (h/db*) assoc-in [:chats "c1"] + {:id "c1" :title "T" :status :idle + :messages [(msg "user" "a" 1) (msg "assistant" "b" 2)]}) + (let [{:keys [parsed]} (get-chat {:after "lastCompaction"})] + (is (= ["a" "b"] (texts parsed))) + (is (nil? (get-in parsed [:messagesMeta :compactionCursor]))))))) + + (deftest handle-stop-test (testing "returns 404 for missing chat" (let [response (handlers/handle-stop (components) nil "nonexistent")] (is (= 404 (:status response))))) From 0689f7ff8b5e2e520d3c7a772bb0bc903cd4bde9 Mon Sep 17 00:00:00 2001 From: Ryan Schmukler Date: Fri, 12 Jun 2026 18:09:51 -0400 Subject: [PATCH 2/2] Add chat/history method and shared message pagination core Fetching a chat's history previously had no pull API on the editor (JSON-RPC) side: chat/open always replayed the entire history as a stream, and the only windowed access lived on the remote HTTP endpoint. For long chats that means streaming tens of thousands of messages on every open, and the optimization was reachable only by the web client. Unify both transports around one transport-agnostic windowing core (eca.features.chat.history) that operates on raw persisted messages: opaque base64url "index.checksum" cursors that resolve by index when the checksum still matches and rescan by checksum otherwise (tolerating flag-insert/remove shifts), a lastCompaction sentinel, and a compaction-cursor. Pages are newest-anchored so opening a chat shows the latest context; an opaque after cursor pages forward instead. before/ after cursors are computed against full history so paging transparently crosses the compaction boundary, and are nil at the ends. On top of the core: - New JSON-RPC chat/history request returns a window inline as {contents, meta} where contents are the same ChatContent items chat/contentReceived streams, so editors reuse their renderer. Inline (not streamed) so the client can place an older page deterministically while live updates keep arriving on the stream. - chat/open gains optional limit/before/after to replay only a window and return meta; no params replays the full history as before. - Remote HTTP GET /api/v1/chats/:id keeps its opt-in query-param pagination, now backed by the shared core. To feed both the stream and the inline response from one place, the message->wire-content transform (including subagent expansion) is lifted out of send-chat-contents! into a pure messages->contents collector, so replay behavior is unchanged. --- CHANGELOG.md | 2 +- docs/protocol.md | 95 ++++++++++++++++++ src/eca/features/chat.clj | 115 +++++++++++++++++----- src/eca/features/chat/history.clj | 122 +++++++++++++++++++++++ src/eca/handlers.clj | 9 ++ src/eca/remote/handlers.clj | 124 ++---------------------- src/eca/server.clj | 3 + test/eca/features/chat/history_test.clj | 80 +++++++++++++++ test/eca/features/chat_test.clj | 81 ++++++++++++++++ test/eca/remote/handlers_test.clj | 5 +- 10 files changed, 491 insertions(+), 145 deletions(-) create mode 100644 src/eca/features/chat/history.clj create mode 100644 test/eca/features/chat/history_test.clj diff --git a/CHANGELOG.md b/CHANGELOG.md index d5cb9ab2d..a01c3e183 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## Unreleased -- Remote HTTP `GET /api/v1/chats/:id`: opt-in message pagination via `limit`/`before`/`after` query params (opaque cursors, `after=lastCompaction` sentinel) returning `messagesMeta` with before/after/compaction cursors. +- Add message pagination over a shared cursor core: new JSON-RPC `chat/history` method and optional `limit`/`before`/`after` window params on `chat/open`, plus opt-in pagination on remote HTTP `GET /api/v1/chats/:id`. Opaque cursors with a `lastCompaction` sentinel; meta exposes before/after/compaction cursors. ## 0.140.1 diff --git a/docs/protocol.md b/docs/protocol.md index 4ee727795..e3c2a6343 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -1986,6 +1986,11 @@ client indicator stays in sync with the auto-approval behavior the server will apply. Typically used after `chat/list` when the user selects a chat that has not been opened in the current client session. +By default the full history is replayed. A client may instead pass the optional +`limit`/`before`/`after` window parameters (same cursors as `chat/history`) to +replay only the newest page and avoid streaming a large history; in that case +the response carries `meta` so the client can page older via `chat/history`. + _Request:_ * method: `chat/open` @@ -1997,6 +2002,18 @@ interface ChatOpenParams { * The chat session identifier to open. */ chatId: string; + + /** + * Optional. Max messages to replay. When any of limit/before/after is + * present, only that window is replayed and `meta` is returned. + */ + limit?: number; + + /** Optional opaque cursor — replay the page older than the cursor. */ + before?: string; + + /** Optional opaque cursor — replay the page newer than the cursor. */ + after?: string; } ``` @@ -2016,6 +2033,84 @@ interface ChatOpenResponse { /** The chat title at the time of replay, when available. */ title?: string; + + /** Pagination metadata, present only when window params were supplied. */ + meta?: ChatHistoryMeta; + + /** Present instead of replaying when a supplied cursor is stale. */ + error?: { code: 'cursor_expired'; message: string }; +} +``` + +### Chat history (↩️) + +A client request to fetch a window of a persisted chat's history as a single +response (no streaming), so an editor can hydrate a bounded view and page older +on demand instead of receiving the full replay. The returned `contents` are the +same `ChatContent` items delivered by `chat/contentReceived`, so the client +reuses its existing renderer; live updates keep arriving over `chat/contentReceived`. + +Pagination uses opaque cursors. The page is anchored to the newest messages in +the selected window; an opaque `after` cursor pages forward instead. The literal +sentinel `"lastCompaction"` may be passed as `before`/`after`: `after: "lastCompaction"` +returns the messages since the last compaction (the active context), +`before: "lastCompaction"` the summarized-away history. Cursors are ephemeral — +a cursor whose message no longer exists (e.g. after a rollback or clear) returns +a `cursor_expired` error, prompting the client to refetch the latest page. + +_Request:_ + +* method: `chat/history` +* params: `ChatHistoryParams` defined as follows: + +```typescript +interface ChatHistoryParams { + /** The chat session identifier to fetch. */ + chatId: string; + + /** Optional. Max messages to return in the page. */ + limit?: number; + + /** Optional opaque cursor (or "lastCompaction") — page older. */ + before?: string; + + /** Optional opaque cursor (or "lastCompaction") — page newer. */ + after?: string; +} +``` + +_Response:_ + +```typescript +interface ChatHistoryResponse { + /** + * The page of history, as the same content items streamed by + * chat/contentReceived. Absent when `error` is present. + */ + contents?: ChatContentReceived[]; + + /** Pagination metadata. Absent when `error` is present. */ + meta?: ChatHistoryMeta; + + /** Present when the chat is unknown or a supplied cursor is stale. */ + error?: { code: 'chat_not_found' | 'cursor_expired'; message: string }; +} + +interface ChatHistoryMeta { + /** Total messages in the full history. */ + total: number; + + /** Number of messages in this page. */ + returned: number; + + /** Cursor to pass as `before` to load older; null at the start. */ + beforeCursor: string | null; + + /** Cursor to pass as `after` to load newer; null at the tail. */ + afterCursor: string | null; + + /** Cursor at the last compaction boundary; null when never compacted. */ + compactionCursor: string | null; } ``` diff --git a/src/eca/features/chat.clj b/src/eca/features/chat.clj index 0f787af2f..ca8c175a6 100644 --- a/src/eca/features/chat.clj +++ b/src/eca/features/chat.clj @@ -7,6 +7,7 @@ [eca.config :as config] [eca.db :as db] [eca.features.background-tasks :as bg] + [eca.features.chat.history :as history] [eca.features.chat.lifecycle :as lifecycle] [eca.features.chat.tool-calls :as tc] [eca.features.commands :as f.commands] @@ -255,26 +256,43 @@ :text (:text message-content) :contentId content-id}}])) +(defn messages->contents + "Pure transform of persisted `messages` into the flat sequence of chat-content + notification payloads ({:chat-id :role :content :parent-chat-id?}) that the + streaming replay sends, including nested subagent expansion. `db` is a plain + db snapshot used only to resolve subagent message histories. + + Shared by the streaming replay (`send-chat-contents!`) and the request/response + `chat/history` method so both render identical content." + [messages {:keys [chat-id parent-chat-id db]}] + (let [->payload (fn [{:keys [role content]}] + (assoc-some {:chat-id chat-id :role role :content content} + :parent-chat-id parent-chat-id))] + (into [] + (mapcat + (fn [message] + (let [chat-contents (message-content->chat-content (:role message) (:content message) (:content-id message)) + subagent-chat-id (when (= "tool_call_output" (:role message)) + (get-in message [:content :details :subagent-chat-id])) + subagent-messages (when subagent-chat-id + (get-in db [:chats subagent-chat-id :messages]))] + (if (some? subagent-messages) + ;; For subagent tool calls: toolCallRun + toolCallRunning, then + ;; subagent messages, then toolCalled — matching live execution order. + (concat (map ->payload (butlast chat-contents)) + (messages->contents subagent-messages + {:chat-id subagent-chat-id + :parent-chat-id chat-id + :db db}) + [(->payload (last chat-contents))]) + (map ->payload chat-contents)))) + messages)))) + (defn ^:private send-chat-contents! [messages chat-ctx] - (doseq [message messages] - (let [chat-contents (message-content->chat-content (:role message) (:content message) (:content-id message)) - subagent-chat-id (when (= "tool_call_output" (:role message)) - (get-in message [:content :details :subagent-chat-id]))] - (if-let [subagent-messages (when subagent-chat-id - (get-in @(:db* chat-ctx) [:chats subagent-chat-id :messages]))] - ;; For subagent tool calls: send toolCallRun + toolCallRunning, then - ;; subagent messages, then toolCalled — matching live execution order. - (let [before-called (butlast chat-contents) - called (last chat-contents)] - (doseq [{:keys [role content]} before-called] - (lifecycle/send-content! chat-ctx role content)) - (send-chat-contents! subagent-messages - (assoc chat-ctx - :chat-id subagent-chat-id - :parent-chat-id (:chat-id chat-ctx))) - (lifecycle/send-content! chat-ctx (:role called) (:content called))) - (doseq [{:keys [role content]} chat-contents] - (lifecycle/send-content! chat-ctx role content)))))) + (doseq [payload (messages->contents messages {:chat-id (:chat-id chat-ctx) + :parent-chat-id (:parent-chat-id chat-ctx) + :db @(:db* chat-ctx)})] + (messenger/chat-content-received (:messenger chat-ctx) payload))) (defn default-model [db config] (llm-api/default-model db config)) @@ -1901,14 +1919,30 @@ selected model to the resumed chat's stored `:model` via a `config/updated` notification, so an Opus-started chat keeps using Opus on the next prompt (#417). Performs no DB mutation otherwise. + + Optional `:limit`/`:before`/`:after` window the replay (same cursors as + `chat/history`); when provided, the response includes `:meta` so the client + can page older via `chat/history`. Without them the full history is replayed. Returns `{:found? false}` when the chat does not exist or is a subagent, - otherwise `{:found? true :chat-id ... :title ...}`." - [{:keys [chat-id]} db* messenger config] - (let [chat (get-in @db* [:chats chat-id])] - (if (or (nil? chat) (:subagent chat)) + otherwise `{:found? true :chat-id ... :title ... :meta? ...}`." + [{:keys [chat-id limit before after] :as params} db* messenger config] + (let [chat (get-in @db* [:chats chat-id]) + windowed? (some #(contains? params %) [:limit :before :after]) + page (when windowed? + (history/window-messages (vec (:messages chat)) + {:limit limit :before before :after after}))] + (cond + (or (nil? chat) (:subagent chat)) {:found? false} + + (= :cursor-expired (:error page)) + {:found? true :chat-id chat-id + :error {:code "cursor_expired" + :message "Cursor no longer points to an existing message; refetch the latest page"}} + + :else (let [title (:title chat) - messages (:messages chat) + messages (if windowed? (:messages page) (:messages chat)) chat-ctx {:chat-id chat-id :db* db* :messenger messenger}] (mark-editor-open! db* chat-id) (messenger/chat-cleared messenger {:chat-id chat-id :messages true}) @@ -1917,4 +1951,35 @@ (lifecycle/send-content! chat-ctx :system (assoc-some {:type :metadata} :title title)) (config/notify-selected-model-changed! (:model chat) db* messenger config (:variant chat)) (config/notify-selected-trust-changed! (:trust chat) db* messenger) - {:found? true :chat-id chat-id :title title})))) + (assoc-some {:found? true :chat-id chat-id :title title} + :meta (when windowed? + {:total (:total page) + :returned (:returned page) + :before-cursor (:before-cursor page) + :after-cursor (:after-cursor page) + :compaction-cursor (history/compaction-cursor (:messages chat))})))))) + +(defn fetch-history + "Window a chat's persisted messages and return the transformed content items + plus pagination meta, for the request/response `chat/history` method. + + `before`/`after` are opaque cursors (or the `lastCompaction` sentinel) and + `limit` bounds the page. Returns {:contents [...] :meta {...}}, or + {:error {:code :message}} when the chat is unknown or a cursor is stale." + [{:keys [chat-id limit before after]} db*] + (let [db @db* + chat (get-in db [:chats chat-id])] + (if (or (nil? chat) (:subagent chat)) + {:error {:code "chat_not_found" :message (str "Chat " chat-id " not found")}} + (let [messages (vec (:messages chat)) + result (history/window-messages messages {:limit limit :before before :after after})] + (if (= :cursor-expired (:error result)) + {:error {:code "cursor_expired" + :message "Cursor no longer points to an existing message; refetch the latest page"}} + {:contents (messages->contents (:messages result) + {:chat-id chat-id :parent-chat-id nil :db db}) + :meta {:total (:total result) + :returned (:returned result) + :before-cursor (:before-cursor result) + :after-cursor (:after-cursor result) + :compaction-cursor (history/compaction-cursor messages)}}))))) diff --git a/src/eca/features/chat/history.clj b/src/eca/features/chat/history.clj new file mode 100644 index 000000000..8d890f1af --- /dev/null +++ b/src/eca/features/chat/history.clj @@ -0,0 +1,122 @@ +(ns eca.features.chat.history + "Transport-agnostic windowing and cursor logic over a chat's persisted + messages, shared by the remote HTTP API and the JSON-RPC `chat/history` + method. + + Cursors are opaque tokens: base64url of \".\" where checksum + fingerprints the message. Resolving trusts the encoded index when its checksum + still matches, else rescans by checksum (tolerating shifts like flag + insert/remove); a cursor whose message is gone resolves to :expired." + (:require + [clojure.string :as string]) + (:import + [java.nio.charset StandardCharsets] + [java.util Base64])) + +(set! *warn-on-reflection* true) + +(def last-compaction-sentinel + "Literal cursor value resolving to the position of the last compaction marker." + "lastCompaction") + +(defn ^:private message-checksum + "Stable-enough fingerprint of a message to validate/resolve a cursor." + [message] + (Integer/toHexString (hash [(:role message) (:created-at message) (:content message)]))) + +(defn ^:private encode-cursor [idx message] + (let [raw (.getBytes (str idx "." (message-checksum message)) StandardCharsets/UTF_8)] + (.encodeToString (.withoutPadding (Base64/getUrlEncoder)) raw))) + +(defn ^:private decode-cursor [^String cursor] + (try + (let [decoded (String. (.decode (Base64/getUrlDecoder) cursor) StandardCharsets/UTF_8) + [idx-s checksum] (string/split decoded #"\." 2)] + {:idx (Integer/parseInt idx-s) :checksum checksum}) + (catch Exception _ nil))) + +(defn ^:private last-compaction-idx + "Index of the last compact_marker message, or nil when never compacted." + [messages] + (loop [i (dec (count messages))] + (cond + (neg? i) nil + (= "compact_marker" (:role (nth messages i))) i + :else (recur (dec i))))) + +(defn ^:private resolve-cursor + "Resolves an opaque cursor to an index, or :expired if its message is gone." + [messages ^String cursor] + (if-let [{:keys [idx checksum]} (decode-cursor cursor)] + (if (and (<= 0 idx) (< idx (count messages)) + (= checksum (message-checksum (nth messages idx)))) + idx + (or (first (keep-indexed (fn [i m] (when (= checksum (message-checksum m)) i)) messages)) + :expired)) + :expired)) + +(defn ^:private resolve-bound + "Resolves a `before`/`after` value to an index, :expired, or nil (absent). + The `lastCompaction` sentinel resolves to the last compaction marker, or + :no-compaction when the chat was never compacted." + [messages value] + (when value + (if (= value last-compaction-sentinel) + (or (last-compaction-idx messages) :no-compaction) + (resolve-cursor messages value)))) + +(defn ^:private coerce-limit + "Coerces a limit that may arrive as an int (JSON-RPC) or string (HTTP query)." + [value] + (when value + (try + (let [n (if (integer? value) value (Integer/parseInt (str value)))] + (when (pos? n) n)) + (catch Exception _ nil)))) + +(defn window-messages + "Windows `messages` by the (after, before) cursors and `limit`. + + `after` is an exclusive lower bound, `before` an exclusive upper bound. The + page is the newest `limit` messages in the window; an opaque `after` cursor as + the only bound pages forward (oldest `limit`). `before-cursor`/`after-cursor` + are computed against full history so paging crosses the compaction boundary, + and are nil at the ends. + + Returns {:messages :before-cursor :after-cursor :total :returned} or + {:error :cursor-expired}." + [messages {:keys [limit before after]}] + (let [messages (vec messages) + n (count messages) + limit (coerce-limit limit) + after-res (resolve-bound messages after) + before-res (resolve-bound messages before)] + (if (or (= :expired after-res) (= :expired before-res)) + {:error :cursor-expired} + (let [lo (if (integer? after-res) after-res -1) ;; exclusive lower + hi (if (integer? before-res) before-res n) ;; exclusive upper + win-start (max 0 (min (inc lo) n)) + win-end (max win-start (min hi n)) + ;; Opaque `after` cursor pages forward; the sentinel stays newest-anchored. + forward? (and (integer? after-res) + (not= after last-compaction-sentinel) + (nil? before-res)) + [slice-start slice-end] + (cond + (nil? limit) [win-start win-end] + forward? [win-start (min win-end (+ win-start limit))] + :else [(max win-start (- win-end limit)) win-end]) + slice (subvec messages slice-start slice-end)] + {:messages slice + :total n + :returned (count slice) + :before-cursor (when (pos? slice-start) + (encode-cursor slice-start (nth messages slice-start))) + :after-cursor (when (< slice-end n) + (encode-cursor (dec slice-end) (nth messages (dec slice-end))))})))) + +(defn compaction-cursor + "Opaque cursor for the last compaction boundary, or nil when never compacted." + [messages] + (when-let [idx (last-compaction-idx (vec messages))] + (encode-cursor idx (nth (vec messages) idx)))) diff --git a/src/eca/handlers.clj b/src/eca/handlers.clj index eb8136aa5..e183f64fe 100644 --- a/src/eca/handlers.clj +++ b/src/eca/handlers.clj @@ -289,6 +289,15 @@ (metrics/task metrics :eca/chat-open (f.chat/open-chat! params db* messenger config))) +(defn chat-history + "Fetch a window of a chat's history (request/response, no streaming). + Supports optional :limit/:before/:after for pagination (see + `f.chat/fetch-history`). Returns `{:contents [...] :meta {...}}` or + `{:error {:code :message}}`." + [{:keys [db* metrics]} params] + (metrics/task metrics :eca/chat-history + (f.chat/fetch-history params db*))) + (defn mcp-stop-server [{:keys [db* messenger metrics config]} params] (metrics/task metrics :eca/mcp-stop-server (f.tools/stop-server! (:name params) db* messenger config metrics))) diff --git a/src/eca/remote/handlers.clj b/src/eca/remote/handlers.clj index 2927a7135..cfbf4e1c6 100644 --- a/src/eca/remote/handlers.clj +++ b/src/eca/remote/handlers.clj @@ -5,9 +5,9 @@ (:require [cheshire.core :as json] [clojure.core.async :as async] - [clojure.string :as string] [eca.config :as config] [eca.features.chat :as f.chat] + [eca.features.chat.history :as f.chat.history] [eca.handlers :as handlers] [eca.remote.messenger :as remote.messenger] [eca.remote.sse :as sse] @@ -15,9 +15,7 @@ [ring.core.protocols :as ring.protocols]) (:import [java.io InputStream] - [java.nio.charset StandardCharsets] - [java.time Instant] - [java.util Base64])) + [java.time Instant])) (set! *warn-on-reflection* true) @@ -166,114 +164,6 @@ (mapv chat-summary))] (json-response chats))) -;; --- Message pagination (opt-in via `limit`/`before`/`after` query params) ----- -;; -;; Cursors are opaque tokens: base64url of "." where checksum -;; fingerprints the message. Resolving trusts the index when its checksum still -;; matches, else rescans by checksum (tolerating shifts like flag insert/remove); -;; a cursor whose message is gone resolves to :expired (-> 409). - -(def ^:private last-compaction-sentinel - "Literal cursor value resolving to the position of the last compaction marker." - "lastCompaction") - -(defn ^:private message-checksum - "Stable-enough fingerprint of a message to validate/resolve a cursor." - [message] - (Integer/toHexString (hash [(:role message) (:created-at message) (:content message)]))) - -(defn ^:private encode-cursor [idx message] - (let [raw (.getBytes (str idx "." (message-checksum message)) StandardCharsets/UTF_8)] - (.encodeToString (.withoutPadding (Base64/getUrlEncoder)) raw))) - -(defn ^:private decode-cursor [^String cursor] - (try - (let [decoded (String. (.decode (Base64/getUrlDecoder) cursor) StandardCharsets/UTF_8) - [idx-s checksum] (string/split decoded #"\." 2)] - {:idx (Integer/parseInt idx-s) :checksum checksum}) - (catch Exception _ nil))) - -(defn ^:private last-compaction-idx - "Index of the last compact_marker message, or nil when never compacted." - [messages] - (loop [i (dec (count messages))] - (cond - (neg? i) nil - (= "compact_marker" (:role (nth messages i))) i - :else (recur (dec i))))) - -(defn ^:private resolve-cursor - "Resolves an opaque cursor to an index, or :expired if its message is gone." - [messages ^String cursor] - (if-let [{:keys [idx checksum]} (decode-cursor cursor)] - (if (and (<= 0 idx) (< idx (count messages)) - (= checksum (message-checksum (nth messages idx)))) - idx - (or (first (keep-indexed (fn [i m] (when (= checksum (message-checksum m)) i)) messages)) - :expired)) - :expired)) - -(defn ^:private resolve-bound - "Resolves a `before`/`after` query value to an index, :expired, or nil (absent). - The `lastCompaction` sentinel resolves to the last compaction marker, or - :no-compaction when the chat was never compacted." - [messages value] - (when value - (if (= value last-compaction-sentinel) - (or (last-compaction-idx messages) :no-compaction) - (resolve-cursor messages value)))) - -(defn ^:private parse-limit [value] - (when value - (try - (let [n (Integer/parseInt (str value))] - (when (pos? n) n)) - (catch Exception _ nil)))) - -(defn ^:private paginate-messages - "Windows `messages` by the (after, before) cursors and `limit`. - - `after` is an exclusive lower bound, `before` an exclusive upper bound. The - page is the newest `limit` messages in the window; an opaque `after` cursor as - the only bound pages forward (oldest `limit`). `before-cursor`/`after-cursor` - are computed against full history so paging crosses the compaction boundary, - and are nil at the ends. - - Returns {:messages :before-cursor :after-cursor :total :returned} or - {:error :cursor-expired}." - [messages {:keys [limit before after]}] - (let [messages (vec messages) - n (count messages) - after-res (resolve-bound messages after) - before-res (resolve-bound messages before)] - (if (or (= :expired after-res) (= :expired before-res)) - {:error :cursor-expired} - (let [lo (if (integer? after-res) after-res -1) ;; exclusive lower - hi (if (integer? before-res) before-res n) ;; exclusive upper - win-start (max 0 (min (inc lo) n)) - win-end (max win-start (min hi n)) - ;; Opaque `after` cursor pages forward; the sentinel stays newest-anchored. - forward? (and (integer? after-res) - (not= after last-compaction-sentinel) - (nil? before-res)) - [slice-start slice-end] - (cond - (nil? limit) [win-start win-end] - forward? [win-start (min win-end (+ win-start limit))] - :else [(max win-start (- win-end limit)) win-end]) - slice (subvec messages slice-start slice-end)] - {:messages slice - :total n - :returned (count slice) - :before-cursor (when (pos? slice-start) - (encode-cursor slice-start (nth messages slice-start))) - :after-cursor (when (< slice-end n) - (encode-cursor (dec slice-end) (nth messages (dec slice-end))))})))) - -(defn ^:private compaction-cursor [messages] - (when-let [idx (last-compaction-idx messages)] - (encode-cursor idx (nth messages idx)))) - (defn handle-get-chat [{:keys [db*]} request chat-id] (if-let [chat (chat-or-404 db* chat-id)] (let [db @db* @@ -282,10 +172,10 @@ paginate? (some #(contains? params %) [:limit :before :after]) all-messages (vec (or (:messages chat) [])) page (when paginate? - (paginate-messages all-messages - {:limit (parse-limit (:limit params)) - :before (:before params) - :after (:after params)})) + (f.chat.history/window-messages all-messages + {:limit (:limit params) + :before (:before params) + :after (:after params)})) base {:id (:id chat) :title (:title chat) :status (or (:status chat) :idle) @@ -311,7 +201,7 @@ :returned (:returned page) :before-cursor (:before-cursor page) :after-cursor (:after-cursor page) - :compaction-cursor (compaction-cursor all-messages)}))) + :compaction-cursor (f.chat.history/compaction-cursor all-messages)}))) :else (json-response (camel-keys base)))) diff --git a/src/eca/server.clj b/src/eca/server.clj index c438bfc78..f9a54acca 100644 --- a/src/eca/server.clj +++ b/src/eca/server.clj @@ -187,6 +187,9 @@ (defmethod jsonrpc.server/receive-request "chat/open" [_ components params] (eventually (handlers/chat-open (with-config components) params))) +(defmethod jsonrpc.server/receive-request "chat/history" [_ components params] + (eventually (handlers/chat-history (with-config components) params))) + (defmethod jsonrpc.server/receive-notification "mcp/stopServer" [_ components params] (async-notify (handlers/mcp-stop-server (with-config components) params))) diff --git a/test/eca/features/chat/history_test.clj b/test/eca/features/chat/history_test.clj new file mode 100644 index 000000000..3a4dc5fe4 --- /dev/null +++ b/test/eca/features/chat/history_test.clj @@ -0,0 +1,80 @@ +(ns eca.features.chat.history-test + (:require + [clojure.test :refer [deftest is testing]] + [eca.features.chat.history :as f.chat.history])) + +(defn- msg [role text created-at] + {:role role :content [{:type :text :text text}] :created-at created-at}) + +(def ^:private messages + [(msg "user" "m0" 1) + (msg "assistant" "m1" 2) + (msg "user" "m2" 3) + (msg "assistant" "m3" 4) + (msg "user" "m4" 5)]) + +(defn- texts [result] + (mapv #(get-in % [:content 0 :text]) (:messages result))) + +(deftest window-messages-test + (testing "limit returns the newest N messages" + (let [r (f.chat.history/window-messages messages {:limit 2})] + (is (= ["m3" "m4"] (texts r))) + (is (= 5 (:total r))) + (is (= 2 (:returned r))) + (is (some? (:before-cursor r))) + (is (nil? (:after-cursor r))))) + + (testing "limit accepts string (HTTP) and int (JSON-RPC)" + (is (= ["m3" "m4"] (texts (f.chat.history/window-messages messages {:limit "2"})))) + (is (= ["m3" "m4"] (texts (f.chat.history/window-messages messages {:limit 2}))))) + + (testing "no limit returns the whole window" + (let [r (f.chat.history/window-messages messages {})] + (is (= ["m0" "m1" "m2" "m3" "m4"] (texts r))) + (is (nil? (:before-cursor r))) + (is (nil? (:after-cursor r))))) + + (testing "before cursor loads the older page" + (let [before (:before-cursor (f.chat.history/window-messages messages {:limit 2})) + r (f.chat.history/window-messages messages {:limit 2 :before before})] + (is (= ["m1" "m2"] (texts r))) + (is (some? (:before-cursor r))) + (is (some? (:after-cursor r))))) + + (testing "opaque after cursor pages forward" + (let [first-cursor (:before-cursor + (f.chat.history/window-messages messages {:limit 4})) + ;; first-cursor points at m1 (index 1); forward page after it + r (f.chat.history/window-messages messages {:limit 2 :after first-cursor})] + (is (= ["m2" "m3"] (texts r))))) + + (testing "expired cursor returns error" + (let [r (f.chat.history/window-messages messages {:before "not-a-real-cursor"})] + (is (= :cursor-expired (:error r)))))) + +(deftest compaction-window-test + (let [with-marker [(msg "user" "m0" 1) + (msg "assistant" "m1" 2) + {:role "compact_marker" :content {:auto? false} :created-at 3} + (msg "user" "summary" 4) + (msg "assistant" "m4" 5)]] + (testing "after=lastCompaction returns post-compaction messages, newest-anchored" + (let [r (f.chat.history/window-messages with-marker {:after f.chat.history/last-compaction-sentinel})] + (is (= ["summary" "m4"] (texts r))) + (is (nil? (:after-cursor r))) + (is (some? (:before-cursor r)))) + (let [r (f.chat.history/window-messages with-marker {:after f.chat.history/last-compaction-sentinel :limit 1})] + (is (= ["m4"] (texts r))))) + + (testing "before=lastCompaction returns the summarized-away history" + (let [r (f.chat.history/window-messages with-marker {:before f.chat.history/last-compaction-sentinel})] + (is (= ["m0" "m1"] (texts r))))) + + (testing "compaction-cursor is present only when compacted" + (is (some? (f.chat.history/compaction-cursor with-marker))) + (is (nil? (f.chat.history/compaction-cursor messages)))) + + (testing "lastCompaction with no marker falls back to the full window" + (let [r (f.chat.history/window-messages messages {:after f.chat.history/last-compaction-sentinel})] + (is (= ["m0" "m1" "m2" "m3" "m4"] (texts r))))))) diff --git a/test/eca/features/chat_test.clj b/test/eca/features/chat_test.clj index cc06f69d8..58353fa07 100644 --- a/test/eca/features/chat_test.clj +++ b/test/eca/features/chat_test.clj @@ -1508,6 +1508,87 @@ (is (match? {:config-updated [{:chat {:select-trust false}}]} (h/messages)))))) +(deftest fetch-history-test + (let [text-msg (fn [text n] {:role "user" :content [{:type :text :text text}] :created-at n}) + seed! (fn [chat-id msgs] + (h/reset-components!) + (swap! (h/db*) assoc-in [:chats chat-id] {:id chat-id :messages msgs}) + chat-id) + content-texts (fn [{:keys [contents]}] + (keep #(some-> (get-in % [:content :text]) string/trim) contents))] + (testing "unknown chat returns an error" + (h/reset-components!) + (is (= "chat_not_found" (get-in (f.chat/fetch-history {:chat-id "nope"} (h/db*)) [:error :code])))) + + (testing "subagent chat is not fetchable" + (h/reset-components!) + (swap! (h/db*) assoc-in [:chats "sub"] {:id "sub" :subagent true :messages []}) + (is (= "chat_not_found" (get-in (f.chat/fetch-history {:chat-id "sub"} (h/db*)) [:error :code])))) + + (testing "returns transformed content items plus meta" + (let [chat-id (seed! "c1" [(text-msg "m0" 1) (text-msg "m1" 2) (text-msg "m2" 3)]) + result (f.chat/fetch-history {:chat-id chat-id :limit 2} (h/db*))] + (is (= ["m1" "m2"] (content-texts result))) + (is (every? #(= chat-id (:chat-id %)) (:contents result))) + (is (= 3 (get-in result [:meta :total]))) + (is (= 2 (get-in result [:meta :returned]))) + (is (some? (get-in result [:meta :before-cursor]))) + (is (nil? (get-in result [:meta :after-cursor]))))) + + (testing "before cursor pages older" + (let [chat-id (seed! "c1" [(text-msg "m0" 1) (text-msg "m1" 2) (text-msg "m2" 3)]) + before (get-in (f.chat/fetch-history {:chat-id chat-id :limit 2} (h/db*)) [:meta :before-cursor]) + result (f.chat/fetch-history {:chat-id chat-id :limit 2 :before before} (h/db*))] + ;; first page was [m1 m2]; older-than-m1 is just [m0] + (is (= ["m0"] (content-texts result))))) + + (testing "after=lastCompaction returns the active context with a compaction cursor" + (let [chat-id (seed! "c1" [(text-msg "m0" 1) + {:role "compact_marker" :content {:auto? false} :created-at 2} + (text-msg "summary" 3) + (text-msg "m3" 4)]) + result (f.chat/fetch-history {:chat-id chat-id :after "lastCompaction"} (h/db*))] + (is (= ["summary" "m3"] (content-texts result))) + (is (some? (get-in result [:meta :compaction-cursor]))))) + + (testing "stale cursor returns cursor_expired" + (let [chat-id (seed! "c1" [(text-msg "m0" 1)])] + (is (= "cursor_expired" + (get-in (f.chat/fetch-history {:chat-id chat-id :before "bogus"} (h/db*)) [:error :code]))))))) + +(deftest open-chat-windowed-test + (let [msg (fn [t n] {:role "user" :content [{:type :text :text t}] :created-at n}) + replayed-texts (fn [] + (->> (h/messages) :chat-content-received + (keep #(some-> (get-in % [:content :text]) string/trim)) + vec))] + (testing "no window params replays the full history with no meta" + (h/reset-components!) + (swap! (h/db*) assoc-in [:chats "full"] + {:id "full" :messages [(msg "m0" 1) (msg "m1" 2) (msg "m2" 3)]}) + (let [result (f.chat/open-chat! {:chat-id "full"} (h/db*) (h/messenger) (h/config))] + (is (nil? (:meta result))) + (is (= ["m0" "m1" "m2"] (replayed-texts))))) + + (testing "limit replays only the window and returns meta" + (h/reset-components!) + (swap! (h/db*) assoc-in [:chats "win"] + {:id "win" :messages [(msg "m0" 1) (msg "m1" 2) (msg "m2" 3)]}) + (let [result (f.chat/open-chat! {:chat-id "win" :limit 2} (h/db*) (h/messenger) (h/config))] + (is (match? {:found? true :chat-id "win" + :meta {:total 3 :returned 2 :after-cursor nil}} + result)) + (is (some? (get-in result [:meta :before-cursor]))) + (is (= ["m1" "m2"] (replayed-texts))))) + + (testing "expired cursor errors before any replay" + (h/reset-components!) + (swap! (h/db*) assoc-in [:chats "exp"] + {:id "exp" :messages [(msg "m0" 1)]}) + (let [result (f.chat/open-chat! {:chat-id "exp" :before "bogus"} (h/db*) (h/messenger) (h/config))] + (is (= "cursor_expired" (get-in result [:error :code]))) + (is (= {} (h/messages)) "no notifications emitted on expired cursor"))))) + (deftest prompt-cache-agent-and-model-switch-test (testing "local static prompt cache is reused only when both agent and model match" (h/reset-components!) diff --git a/test/eca/remote/handlers_test.clj b/test/eca/remote/handlers_test.clj index 5106ee784..7077ddc59 100644 --- a/test/eca/remote/handlers_test.clj +++ b/test/eca/remote/handlers_test.clj @@ -3,6 +3,7 @@ [cheshire.core :as json] [clojure.test :refer [deftest is testing]] [eca.config :as config] + [eca.features.chat.history :as f.chat.history] [eca.messenger :as messenger] [eca.remote.handlers :as handlers] [eca.remote.messenger :as remote.messenger] @@ -266,7 +267,7 @@ (testing "opaque after cursor forward-pages the messages right after it" (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) - (let [after-cursor (#'handlers/encode-cursor 0 (nth messages 0)) + (let [after-cursor (#'f.chat.history/encode-cursor 0 (nth messages 0)) {:keys [parsed]} (get-chat {:limit "2" :after after-cursor})] (is (= ["m1" "m2"] (texts parsed))))) @@ -279,7 +280,7 @@ (testing "expired cursor returns 409" (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "T" :status :idle :messages messages}) - (let [bogus (#'handlers/encode-cursor 99 {:role "x" :content "gone" :created-at 999999}) + (let [bogus (#'f.chat.history/encode-cursor 99 {:role "x" :content "gone" :created-at 999999}) response (handlers/handle-get-chat (components) {:params {:before bogus}} "c1") body (json/parse-string (:body response) true)] (is (= 409 (:status response)))