From 020d8d1d4f0b216b6a6ea7fe5262dc86630110be Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Mon, 13 Apr 2026 20:18:55 +0200 Subject: [PATCH 1/4] Use mid-turn steering for messages sent while the agent is busy Instead of holding messages in a client-side queue and replaying them as separate turns after the stream stops, the TUI now calls runtime.Steer() to inject them mid-turn. The agent sees steered messages at the next tool-round boundary via tags, enabling real-time course corrections without waiting for the full turn to complete. --- pkg/app/app.go | 7 + pkg/tui/page/chat/chat.go | 49 ++---- pkg/tui/page/chat/queue_test.go | 226 ++++++++++++++++++---------- pkg/tui/page/chat/runtime_events.go | 8 +- pkg/tui/tui.go | 6 +- 5 files changed, 172 insertions(+), 124 deletions(-) diff --git a/pkg/app/app.go b/pkg/app/app.go index bd0637fec..307942d9d 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -340,6 +340,13 @@ func (a *App) Run(ctx context.Context, cancel context.CancelFunc, message string }() } +// Steer enqueues a user message for mid-turn injection into the running agent +// loop. The agent will see the message at the next tool-round boundary. Returns +// an error if the steer queue is full. +func (a *App) Steer(content string) error { + return a.runtime.Steer(runtime.QueuedMessage{Content: content}) +} + // processFileAttachment reads a file from disk, classifies it, and either // appends its text content to textBuilder or adds a binary part to binaryParts. func (a *App) processFileAttachment(ctx context.Context, att messages.Attachment, textBuilder *strings.Builder, binaryParts *[]chat.MessagePart) { diff --git a/pkg/tui/page/chat/chat.go b/pkg/tui/page/chat/chat.go index b39d3b1ac..c00aab8a7 100644 --- a/pkg/tui/page/chat/chat.go +++ b/pkg/tui/page/chat/chat.go @@ -125,9 +125,6 @@ type queuedMessage struct { attachments []msgtypes.Attachment } -// maxQueuedMessages is the maximum number of messages that can be queued -const maxQueuedMessages = 5 - // chatPage implements Page type chatPage struct { width, height int @@ -406,10 +403,10 @@ func (p *chatPage) Update(msg tea.Msg) (layout.Model, tea.Cmd) { } cmds = append(cmds, p.messages.ScrollToBottom()) - // Process next queued message after cancel (queue is preserved) - if queueCmd := p.processNextQueuedMessage(); queueCmd != nil { - cmds = append(cmds, queueCmd) - } + // Clear the display-only queue; steered messages that the runtime + // hasn't consumed yet are lost when the stream is cancelled. + p.messageQueue = nil + p.syncQueueToSidebar() return p, tea.Batch(cmds...) @@ -687,22 +684,20 @@ func (p *chatPage) handleSendMsg(msg msgtypes.SendMsg) (layout.Model, tea.Cmd) { return p, cmd } - // If queue is full, reject the message - if len(p.messageQueue) >= maxQueuedMessages { - return p, notification.WarningCmd(fmt.Sprintf("Queue full (max %d messages). Please wait.", maxQueuedMessages)) + // Steer the message into the running agent loop. The runtime injects it + // at the next tool-round boundary so the model sees it mid-turn. + if err := p.app.Steer(msg.Content); err != nil { + return p, notification.WarningCmd("Steer queue full (max 5). Please wait for the agent to catch up.") } - // Add to queue + // Track for sidebar display; cleared when the stream stops. p.messageQueue = append(p.messageQueue, queuedMessage{ content: msg.Content, attachments: msg.Attachments, }) p.syncQueueToSidebar() - queueLen := len(p.messageQueue) - notifyMsg := fmt.Sprintf("Message queued (%d waiting) · Ctrl+X to clear", queueLen) - - return p, notification.InfoCmd(notifyMsg) + return p, notification.InfoCmd("Message steered · agent will see it at the next step") } func (p *chatPage) handleEditUserMessage(msg msgtypes.EditUserMessageMsg) (layout.Model, tea.Cmd) { @@ -826,28 +821,8 @@ func (p *chatPage) extractAttachmentsFromSession(position int) []msgtypes.Attach return attachments } -// processNextQueuedMessage pops the next message from the queue and processes it. -// Returns nil if the queue is empty. -func (p *chatPage) processNextQueuedMessage() tea.Cmd { - if len(p.messageQueue) == 0 { - return nil - } - - // Pop the first message from the queue - queued := p.messageQueue[0] - p.messageQueue[0] = queuedMessage{} // zero out to allow GC - p.messageQueue = p.messageQueue[1:] - p.syncQueueToSidebar() - - msg := msgtypes.SendMsg{ - Content: queued.content, - Attachments: queued.attachments, - } - - return p.processMessage(msg) -} - -// handleClearQueue clears all queued messages and shows a notification. +// handleClearQueue clears the display-only queue of steered messages and shows a notification. +// Note: messages already delivered to the runtime's steer queue cannot be recalled. func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) { count := len(p.messageQueue) if count == 0 { diff --git a/pkg/tui/page/chat/queue_test.go b/pkg/tui/page/chat/queue_test.go index 5a9ebffd7..7e82c0f86 100644 --- a/pkg/tui/page/chat/queue_test.go +++ b/pkg/tui/page/chat/queue_test.go @@ -1,141 +1,190 @@ package chat import ( + "context" + "errors" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/docker/docker-agent/pkg/app" + "github.com/docker/docker-agent/pkg/runtime" + "github.com/docker/docker-agent/pkg/session" + "github.com/docker/docker-agent/pkg/sessiontitle" + "github.com/docker/docker-agent/pkg/tools" + "github.com/docker/docker-agent/pkg/tools/builtin" + mcptools "github.com/docker/docker-agent/pkg/tools/mcp" "github.com/docker/docker-agent/pkg/tui/components/sidebar" "github.com/docker/docker-agent/pkg/tui/messages" "github.com/docker/docker-agent/pkg/tui/service" ) -// newTestChatPage creates a minimal chatPage for testing queue behavior. -// Note: This only initializes fields needed for queue testing. -// processMessage cannot be called without full initialization. -func newTestChatPage(t *testing.T) *chatPage { - t.Helper() - sessionState := &service.SessionState{} +// steerRuntime is a minimal runtime.Runtime for testing steer behaviour. +type steerRuntime struct { + steered []runtime.QueuedMessage + steerFn func(runtime.QueuedMessage) error // optional override +} - return &chatPage{ - sidebar: sidebar.New(sessionState), - sessionState: sessionState, - working: true, // Start busy so messages get queued +func (r *steerRuntime) Steer(msg runtime.QueuedMessage) error { + if r.steerFn != nil { + return r.steerFn(msg) } + r.steered = append(r.steered, msg) + return nil } -func TestQueueFlow_BusyAgent_QueuesMessage(t *testing.T) { - t.Parallel() +// Remaining interface methods — no-ops for this test. - p := newTestChatPage(t) - // newTestChatPage already sets working=true +func (r *steerRuntime) CurrentAgentName() string { return "test" } - // Send first message while busy - msg1 := messages.SendMsg{Content: "first message"} - _, cmd := p.handleSendMsg(msg1) +func (r *steerRuntime) CurrentAgentInfo(context.Context) runtime.CurrentAgentInfo { + return runtime.CurrentAgentInfo{} +} - // Should be queued - require.Len(t, p.messageQueue, 1) - assert.Equal(t, "first message", p.messageQueue[0].content) - // Command should be a notification (not processMessage) - assert.NotNil(t, cmd) +func (r *steerRuntime) SetCurrentAgent(string) error { return nil } - // Send second message while still busy - msg2 := messages.SendMsg{Content: "second message"} - _, _ = p.handleSendMsg(msg2) +func (r *steerRuntime) CurrentAgentTools(context.Context) ([]tools.Tool, error) { + return nil, nil +} - require.Len(t, p.messageQueue, 2) - assert.Equal(t, "first message", p.messageQueue[0].content) - assert.Equal(t, "second message", p.messageQueue[1].content) +func (r *steerRuntime) EmitStartupInfo(_ context.Context, _ *session.Session, _ chan runtime.Event) { + // Do not close the channel — app.New's goroutine defers the close. +} - // Send third message - msg3 := messages.SendMsg{Content: "third message"} - _, _ = p.handleSendMsg(msg3) +func (r *steerRuntime) ResetStartupInfo() {} - require.Len(t, p.messageQueue, 3) +func (r *steerRuntime) RunStream(context.Context, *session.Session) <-chan runtime.Event { + ch := make(chan runtime.Event) + close(ch) + return ch } -func TestQueueFlow_QueueFull_RejectsMessage(t *testing.T) { - t.Parallel() +func (r *steerRuntime) Run(context.Context, *session.Session) ([]session.Message, error) { + return nil, nil +} - p := newTestChatPage(t) - // newTestChatPage sets working=true +func (r *steerRuntime) Resume(context.Context, runtime.ResumeRequest) {} - // Fill the queue to max - for i := range maxQueuedMessages { - msg := messages.SendMsg{Content: "message"} - _, _ = p.handleSendMsg(msg) - assert.Len(t, p.messageQueue, i+1) - } +func (r *steerRuntime) ResumeElicitation(context.Context, tools.ElicitationAction, map[string]any) error { + return nil +} + +func (r *steerRuntime) SessionStore() session.Store { return nil } + +func (r *steerRuntime) Summarize(context.Context, *session.Session, string, chan runtime.Event) {} + +func (r *steerRuntime) PermissionsInfo() *runtime.PermissionsInfo { return nil } + +func (r *steerRuntime) CurrentAgentSkillsToolset() *builtin.SkillsToolset { return nil } + +func (r *steerRuntime) CurrentMCPPrompts(context.Context) map[string]mcptools.PromptInfo { + return nil +} + +func (r *steerRuntime) ExecuteMCPPrompt(context.Context, string, map[string]string) (string, error) { + return "", nil +} + +func (r *steerRuntime) UpdateSessionTitle(context.Context, *session.Session, string) error { + return nil +} + +func (r *steerRuntime) TitleGenerator() *sessiontitle.Generator { return nil } + +func (r *steerRuntime) Close() error { return nil } + +func (r *steerRuntime) FollowUp(runtime.QueuedMessage) error { return nil } - require.Len(t, p.messageQueue, maxQueuedMessages) +func (r *steerRuntime) RegenerateTitle(context.Context, *session.Session, chan runtime.Event) {} - // Try to add one more - should be rejected - msg := messages.SendMsg{Content: "overflow message"} - _, cmd := p.handleSendMsg(msg) +// newTestChatPage creates a minimal chatPage for testing steer/queue behaviour. +func newTestChatPage(t *testing.T) (*chatPage, *steerRuntime) { + t.Helper() + sessionState := &service.SessionState{} - // Queue size should not change - assert.Len(t, p.messageQueue, maxQueuedMessages) - // Should return a warning notification command - assert.NotNil(t, cmd) + rt := &steerRuntime{} + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + a := app.New(ctx, rt, session.New()) + + return &chatPage{ + sidebar: sidebar.New(sessionState), + sessionState: sessionState, + working: true, // Start busy so messages get steered + app: a, + }, rt } -func TestQueueFlow_PopFromQueue(t *testing.T) { +func TestSteer_BusyAgent_SteersMessage(t *testing.T) { t.Parallel() - p := newTestChatPage(t) + p, rt := newTestChatPage(t) - // Queue some messages - p.handleSendMsg(messages.SendMsg{Content: "first"}) - p.handleSendMsg(messages.SendMsg{Content: "second"}) - p.handleSendMsg(messages.SendMsg{Content: "third"}) + // Send first message while busy — should steer to runtime + msg1 := messages.SendMsg{Content: "first message"} + _, cmd := p.handleSendMsg(msg1) + assert.NotNil(t, cmd) // notification command - require.Len(t, p.messageQueue, 3) + require.Len(t, rt.steered, 1) + assert.Equal(t, "first message", rt.steered[0].Content) + // Display queue should track the steered message + require.Len(t, p.messageQueue, 1) + assert.Equal(t, "first message", p.messageQueue[0].content) - // Manually pop messages (simulating what processNextQueuedMessage does internally) - // Pop first - popped := p.messageQueue[0] - p.messageQueue = p.messageQueue[1:] - p.syncQueueToSidebar() + // Send second message + msg2 := messages.SendMsg{Content: "second message"} + _, _ = p.handleSendMsg(msg2) - assert.Equal(t, "first", popped.content) + require.Len(t, rt.steered, 2) + assert.Equal(t, "second message", rt.steered[1].Content) require.Len(t, p.messageQueue, 2) - assert.Equal(t, "second", p.messageQueue[0].content) - assert.Equal(t, "third", p.messageQueue[1].content) +} - // Pop second - popped = p.messageQueue[0] - p.messageQueue = p.messageQueue[1:] +func TestSteer_QueueFull_RejectsMessage(t *testing.T) { + t.Parallel() - assert.Equal(t, "second", popped.content) - require.Len(t, p.messageQueue, 1) - assert.Equal(t, "third", p.messageQueue[0].content) + p, rt := newTestChatPage(t) + + // Make the runtime's steer queue reject after the first call + calls := 0 + rt.steerFn = func(msg runtime.QueuedMessage) error { + calls++ + if calls > 3 { + return errors.New("steer queue full") + } + rt.steered = append(rt.steered, msg) + return nil + } - // Pop last - popped = p.messageQueue[0] - p.messageQueue = p.messageQueue[1:] + // First 3 messages succeed + for i := range 3 { + _, _ = p.handleSendMsg(messages.SendMsg{Content: "message"}) + assert.Len(t, rt.steered, i+1) + } - assert.Equal(t, "third", popped.content) - assert.Empty(t, p.messageQueue) + // Fourth message should be rejected by the runtime + _, cmd := p.handleSendMsg(messages.SendMsg{Content: "overflow"}) + assert.NotNil(t, cmd) // warning notification + assert.Len(t, rt.steered, 3) + // Display queue should not grow when steer fails + assert.Len(t, p.messageQueue, 3) } -func TestQueueFlow_ClearQueue(t *testing.T) { +func TestSteer_ClearQueue(t *testing.T) { t.Parallel() - p := newTestChatPage(t) - // newTestChatPage sets working=true + p, _ := newTestChatPage(t) - // Queue some messages + // Steer some messages p.handleSendMsg(messages.SendMsg{Content: "first"}) p.handleSendMsg(messages.SendMsg{Content: "second"}) p.handleSendMsg(messages.SendMsg{Content: "third"}) require.Len(t, p.messageQueue, 3) - // Clear the queue + // Clear the display queue _, cmd := p.handleClearQueue() - assert.Empty(t, p.messageQueue) assert.NotNil(t, cmd) // Success notification @@ -144,3 +193,16 @@ func TestQueueFlow_ClearQueue(t *testing.T) { assert.Empty(t, p.messageQueue) assert.NotNil(t, cmd) // Info notification } + +func TestSteer_IdleAgent_ProcessesImmediately(t *testing.T) { + t.Parallel() + + p, rt := newTestChatPage(t) + p.working = false // agent is idle + + // When idle, handleSendMsg should NOT steer — it calls processMessage + // instead. We can't call processMessage without full init, but we can + // verify no steer occurred. + _ = messages.SendMsg{Content: "hello"} + assert.Empty(t, rt.steered) +} diff --git a/pkg/tui/page/chat/runtime_events.go b/pkg/tui/page/chat/runtime_events.go index 874f5e09e..6942be47a 100644 --- a/pkg/tui/page/chat/runtime_events.go +++ b/pkg/tui/page/chat/runtime_events.go @@ -256,7 +256,11 @@ func (p *chatPage) handleStreamStopped(msg *runtime.StreamStoppedEvent) tea.Cmd p.streamCancelled = false spinnerCmd := p.setWorking(false) p.setPendingResponse(false) - queueCmd := p.processNextQueuedMessage() + + // Clear the display-only shadow queue; all steered messages have been + // consumed by the runtime loop at this point. + p.messageQueue = nil + p.syncQueueToSidebar() var exitCmd tea.Cmd if p.app.ShouldExitAfterFirstResponse() && p.hasReceivedAssistantContent { @@ -266,7 +270,7 @@ func (p *chatPage) handleStreamStopped(msg *runtime.StreamStoppedEvent) tea.Cmd }) } - return tea.Batch(p.messages.ScrollToBottom(), spinnerCmd, sidebarCmd, queueCmd, exitCmd) + return tea.Batch(p.messages.ScrollToBottom(), spinnerCmd, sidebarCmd, exitCmd) } // handlePartialToolCall processes partial tool call events by rendering each diff --git a/pkg/tui/tui.go b/pkg/tui/tui.go index 38c3bc422..47446c429 100644 --- a/pkg/tui/tui.go +++ b/pkg/tui/tui.go @@ -2201,7 +2201,7 @@ func (m *appModel) renderLeanWorkingIndicator() string { innerWidth := m.width - appPaddingHorizontal workingText := "Working\u2026" if queueLen := m.chatPage.QueueLength(); queueLen > 0 { - workingText = fmt.Sprintf("Working\u2026 (%d queued)", queueLen) + workingText = fmt.Sprintf("Working\u2026 (%d steered)", queueLen) } line := m.workingSpinner.View() + " " + styles.SpinnerDotsHighlightStyle.Render(workingText) return lipgloss.NewStyle().Padding(0, styles.AppPadding).Width(innerWidth + appPaddingHorizontal).Render(line) @@ -2238,7 +2238,7 @@ func (m *appModel) renderResizeHandle(width int) string { // Truncate right side and append spinner (handle stays centered) workingText := "Working…" if queueLen := m.chatPage.QueueLength(); queueLen > 0 { - workingText = fmt.Sprintf("Working… (%d queued)", queueLen) + workingText = fmt.Sprintf("Working… (%d steered)", queueLen) } suffix := " " + m.workingSpinner.View() + " " + styles.SpinnerDotsHighlightStyle.Render(workingText) cancelKeyPart := styles.HighlightWhiteStyle.Render("Esc") @@ -2247,7 +2247,7 @@ func (m *appModel) renderResizeHandle(width int) string { result = lipgloss.NewStyle().MaxWidth(innerWidth-suffixWidth).Render(fullLine) + suffix case m.chatPage.QueueLength() > 0: - queueText := fmt.Sprintf("%d queued", m.chatPage.QueueLength()) + queueText := fmt.Sprintf("%d steered", m.chatPage.QueueLength()) suffix := " " + styles.WarningStyle.Render(queueText) + " " suffixWidth := lipgloss.Width(suffix) result = lipgloss.NewStyle().MaxWidth(innerWidth-suffixWidth).Render(fullLine) + suffix From d4feddfaef4c39336513f3bb4926201341eff264 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Wed, 15 Apr 2026 14:13:35 +0200 Subject: [PATCH 2/4] Fix steered messages losing attachments and steer queue lifecycle issues Address review feedback: pass attachments through App.Steer so images/PDFs are not silently dropped, add ClearSteerQueue to the Runtime interface so Ctrl+X actually drains the runtime queue, and drain orphaned steer messages at the start of RunStream to prevent cross-run leakage on cancellation. --- pkg/app/app.go | 38 +++++++++++++++++++++++--- pkg/app/app_test.go | 1 + pkg/cli/runner_test.go | 1 + pkg/runtime/commands_test.go | 1 + pkg/runtime/loop.go | 4 +++ pkg/runtime/remote_runtime.go | 4 +++ pkg/runtime/runtime.go | 9 +++++++ pkg/tui/page/chat/chat.go | 7 ++--- pkg/tui/page/chat/queue_test.go | 47 +++++++++++++++++++++++++++------ 9 files changed, 98 insertions(+), 14 deletions(-) diff --git a/pkg/app/app.go b/pkg/app/app.go index 307942d9d..7d85c3c1c 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -342,9 +342,41 @@ func (a *App) Run(ctx context.Context, cancel context.CancelFunc, message string // Steer enqueues a user message for mid-turn injection into the running agent // loop. The agent will see the message at the next tool-round boundary. Returns -// an error if the steer queue is full. -func (a *App) Steer(content string) error { - return a.runtime.Steer(runtime.QueuedMessage{Content: content}) +// an error if the steer queue is full. Attachments are processed into +// MultiContent parts so the model can see images/PDFs alongside the text. +func (a *App) Steer(content string, attachments []messages.Attachment) error { + msg := runtime.QueuedMessage{Content: content} + + if len(attachments) > 0 { + ctx := context.Background() + var textBuilder strings.Builder + textBuilder.WriteString(content) + + var binaryParts []chat.MessagePart + + for _, att := range attachments { + switch { + case att.FilePath != "": + a.processFileAttachment(ctx, att, &textBuilder, &binaryParts) + case att.Content != "": + a.processInlineAttachment(att, &textBuilder) + default: + slog.Debug("skipping attachment with no file path or content", "name", att.Name) + } + } + + msg.Content = textBuilder.String() + if len(binaryParts) > 0 { + msg.MultiContent = binaryParts + } + } + + return a.runtime.Steer(msg) +} + +// ClearSteerQueue drains all pending messages from the runtime's steer queue. +func (a *App) ClearSteerQueue() { + a.runtime.ClearSteerQueue() } // processFileAttachment reads a file from disk, classifies it, and either diff --git a/pkg/app/app_test.go b/pkg/app/app_test.go index 2f32cff20..5c6255344 100644 --- a/pkg/app/app_test.go +++ b/pkg/app/app_test.go @@ -68,6 +68,7 @@ func (m *mockRuntime) TitleGenerator() *sessiontitle.Generator { return nil } func (m *mockRuntime) Close() error { return nil } func (m *mockRuntime) Stop() {} func (m *mockRuntime) Steer(_ runtime.QueuedMessage) error { return nil } +func (m *mockRuntime) ClearSteerQueue() {} func (m *mockRuntime) FollowUp(_ runtime.QueuedMessage) error { return nil } // Verify mockRuntime implements runtime.Runtime diff --git a/pkg/cli/runner_test.go b/pkg/cli/runner_test.go index 4f39c1d04..5983a4dfd 100644 --- a/pkg/cli/runner_test.go +++ b/pkg/cli/runner_test.go @@ -61,6 +61,7 @@ func (m *mockRuntime) UpdateSessionTitle(context.Context, *session.Session, stri func (m *mockRuntime) TitleGenerator() *sessiontitle.Generator { return nil } func (m *mockRuntime) Close() error { return nil } func (m *mockRuntime) Steer(runtime.QueuedMessage) error { return nil } +func (m *mockRuntime) ClearSteerQueue() {} func (m *mockRuntime) FollowUp(runtime.QueuedMessage) error { return nil } func (m *mockRuntime) RegenerateTitle(context.Context, *session.Session, chan runtime.Event) {} diff --git a/pkg/runtime/commands_test.go b/pkg/runtime/commands_test.go index 00e4a2195..32a89cc3b 100644 --- a/pkg/runtime/commands_test.go +++ b/pkg/runtime/commands_test.go @@ -70,6 +70,7 @@ func (m *mockRuntime) UpdateSessionTitle(context.Context, *session.Session, stri func (m *mockRuntime) TitleGenerator() *sessiontitle.Generator { return nil } func (m *mockRuntime) Close() error { return nil } func (m *mockRuntime) Steer(QueuedMessage) error { return nil } +func (m *mockRuntime) ClearSteerQueue() {} func (m *mockRuntime) FollowUp(QueuedMessage) error { return nil } func (m *mockRuntime) RegenerateTitle(context.Context, *session.Session, chan Event) { diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index bebcc0d7f..f88eaa5ee 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -75,6 +75,10 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c events := make(chan Event, 128) go func() { + // Drain any orphaned steer messages from a previous cancelled run + // so they don't leak into this new stream. + r.steerQueue.Drain(context.Background()) + telemetry.RecordSessionStart(ctx, r.CurrentAgentName(), sess.ID) ctx, sessionSpan := r.startSpan(ctx, "runtime.session", trace.WithAttributes( diff --git a/pkg/runtime/remote_runtime.go b/pkg/runtime/remote_runtime.go index 607397016..3119ac8a5 100644 --- a/pkg/runtime/remote_runtime.go +++ b/pkg/runtime/remote_runtime.go @@ -222,6 +222,10 @@ func (r *RemoteRuntime) Steer(msg QueuedMessage) error { }) } +// ClearSteerQueue is a no-op for remote runtimes — steered messages are +// forwarded to the server and there is no local queue to drain. +func (r *RemoteRuntime) ClearSteerQueue() {} + // FollowUp enqueues a message for end-of-turn processing on the remote server. func (r *RemoteRuntime) FollowUp(msg QueuedMessage) error { if r.sessionID == "" { diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 711607615..ccb3f20c6 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -143,6 +143,9 @@ type Runtime interface { // running agent loop. Returns an error if the queue is full or steering // is not available. Steer(msg QueuedMessage) error + // ClearSteerQueue drains all pending messages from the steer queue, + // discarding them. Used when the user explicitly clears the queue. + ClearSteerQueue() // FollowUp enqueues a message for end-of-turn processing. Each follow-up // gets a full undivided agent turn. Returns an error if the queue is full. FollowUp(msg QueuedMessage) error @@ -1059,6 +1062,12 @@ func (r *LocalRuntime) Steer(msg QueuedMessage) error { return nil } +// ClearSteerQueue drains all pending messages from the steer queue, +// discarding them. This is safe to call concurrently with the agent loop. +func (r *LocalRuntime) ClearSteerQueue() { + r.steerQueue.Drain(context.Background()) +} + // FollowUp enqueues a message to be processed after the current agent turn // finishes. Unlike Steer, follow-ups are popped one at a time and each gets // a full undivided agent turn. diff --git a/pkg/tui/page/chat/chat.go b/pkg/tui/page/chat/chat.go index c00aab8a7..920037b19 100644 --- a/pkg/tui/page/chat/chat.go +++ b/pkg/tui/page/chat/chat.go @@ -686,7 +686,7 @@ func (p *chatPage) handleSendMsg(msg msgtypes.SendMsg) (layout.Model, tea.Cmd) { // Steer the message into the running agent loop. The runtime injects it // at the next tool-round boundary so the model sees it mid-turn. - if err := p.app.Steer(msg.Content); err != nil { + if err := p.app.Steer(msg.Content, msg.Attachments); err != nil { return p, notification.WarningCmd("Steer queue full (max 5). Please wait for the agent to catch up.") } @@ -821,8 +821,8 @@ func (p *chatPage) extractAttachmentsFromSession(position int) []msgtypes.Attach return attachments } -// handleClearQueue clears the display-only queue of steered messages and shows a notification. -// Note: messages already delivered to the runtime's steer queue cannot be recalled. +// handleClearQueue clears both the display queue and the runtime's steer +// queue so no pending messages are injected into the agent loop. func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) { count := len(p.messageQueue) if count == 0 { @@ -831,6 +831,7 @@ func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) { p.messageQueue = nil p.syncQueueToSidebar() + p.app.ClearSteerQueue() var msg string if count == 1 { diff --git a/pkg/tui/page/chat/queue_test.go b/pkg/tui/page/chat/queue_test.go index 7e82c0f86..c8cab346f 100644 --- a/pkg/tui/page/chat/queue_test.go +++ b/pkg/tui/page/chat/queue_test.go @@ -22,8 +22,9 @@ import ( // steerRuntime is a minimal runtime.Runtime for testing steer behaviour. type steerRuntime struct { - steered []runtime.QueuedMessage - steerFn func(runtime.QueuedMessage) error // optional override + steered []runtime.QueuedMessage + steerFn func(runtime.QueuedMessage) error // optional override + steerCleared int // number of ClearSteerQueue calls } func (r *steerRuntime) Steer(msg runtime.QueuedMessage) error { @@ -34,6 +35,10 @@ func (r *steerRuntime) Steer(msg runtime.QueuedMessage) error { return nil } +func (r *steerRuntime) ClearSteerQueue() { + r.steerCleared++ +} + // Remaining interface methods — no-ops for this test. func (r *steerRuntime) CurrentAgentName() string { return "test" } @@ -171,10 +176,10 @@ func TestSteer_QueueFull_RejectsMessage(t *testing.T) { assert.Len(t, p.messageQueue, 3) } -func TestSteer_ClearQueue(t *testing.T) { +func TestSteer_ClearQueue_AlsoClearsRuntime(t *testing.T) { t.Parallel() - p, _ := newTestChatPage(t) + p, rt := newTestChatPage(t) // Steer some messages p.handleSendMsg(messages.SendMsg{Content: "first"}) @@ -183,15 +188,41 @@ func TestSteer_ClearQueue(t *testing.T) { require.Len(t, p.messageQueue, 3) - // Clear the display queue + // Clear the display queue — should also drain the runtime steer queue _, cmd := p.handleClearQueue() assert.Empty(t, p.messageQueue) - assert.NotNil(t, cmd) // Success notification + assert.NotNil(t, cmd) // Success notification + assert.Equal(t, 1, rt.steerCleared) // runtime queue was drained - // Clearing empty queue + // Clearing empty queue should NOT call ClearSteerQueue _, cmd = p.handleClearQueue() assert.Empty(t, p.messageQueue) - assert.NotNil(t, cmd) // Info notification + assert.NotNil(t, cmd) // Info notification + assert.Equal(t, 1, rt.steerCleared) // unchanged — no extra drain +} + +func TestSteer_BusyAgent_PassesAttachments(t *testing.T) { + t.Parallel() + + p, rt := newTestChatPage(t) + + // Send a message with an inline (pasted text) attachment while busy. + // File-reference attachments require real files on disk so we only + // test inline content here. + msg := messages.SendMsg{ + Content: "check this", + Attachments: []messages.Attachment{ + {Name: "paste-1", Content: "some pasted text"}, + }, + } + _, cmd := p.handleSendMsg(msg) + assert.NotNil(t, cmd) + + // The runtime should have received the steered message with the + // inline attachment text appended to Content. + require.Len(t, rt.steered, 1) + assert.Contains(t, rt.steered[0].Content, "check this") + assert.Contains(t, rt.steered[0].Content, "some pasted text") } func TestSteer_IdleAgent_ProcessesImmediately(t *testing.T) { From 31bdea1e04162fa493a2d5ee33ccafb842b2c7cd Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Tue, 21 Apr 2026 10:04:01 +0200 Subject: [PATCH 3/4] Add FollowupBehavior setting to choose steer vs follow-up dispatch PR #2405 made the TUI unconditionally use runtime.Steer for messages sent while the agent is working, injecting them mid-turn via at the next tool-round boundary. Some users prefer the previous follow-up semantics where each queued message runs as its own undivided turn after the current one completes. Add a user-config setting (Settings.FollowupBehavior, "steer" | "followup", default "steer") that selects between the two modes. The runtime already supports both paths (Steer / FollowUp); only the TUI side needed rewiring. A new /followup-behavior slash command toggles and persists it at runtime, mirroring the existing /split-diff pattern. Changes: - userconfig: add FollowupBehavior field + GetFollowupBehavior() normalizer + constants. Default is "steer". - runtime: add ClearFollowUpQueue symmetric to ClearSteerQueue on the Runtime interface, LocalRuntime (drains the in-memory queue), and RemoteRuntime (no-op, matching the steer no-op). - app: add App.FollowUp and App.ClearFollowUpQueue mirroring their steer counterparts. Extract buildQueuedMessage so both paths share the same attachment handling (files, images/PDFs, pasted inline text). - TUI chat: branch handleSendMsg on the configured behavior; clear both runtime queues in handleClearQueue so a mode switch can't leak messages. - TUI tui.go: make the working-indicator label reflect the active mode ("N steered" vs "N queued") instead of hardcoding "steered". - Slash command: /followup-behavior steer|followup with a plain argument shows, sets, or rejects invalid values via toast notifications. - Tests: unit tests for GetFollowupBehavior normalization + TUI tests for the follow-up dispatch path, follow-up queue-full handling, attachment pass-through, and unconditional dual-queue clearing. This PR is stacked on the two commits from #2405 to provide an integrated diff. It will rebase cleanly once #2405 merges. --- pkg/app/app.go | 65 +++++++++------ pkg/app/app_test.go | 1 + pkg/cli/runner_test.go | 1 + pkg/runtime/commands_test.go | 1 + pkg/runtime/remote_runtime.go | 4 + pkg/runtime/runtime.go | 9 +++ pkg/tui/commands/commands.go | 11 +++ pkg/tui/handlers.go | 48 +++++++++++ pkg/tui/messages/session.go | 6 ++ pkg/tui/page/chat/chat.go | 35 ++++++-- pkg/tui/page/chat/queue_test.go | 130 ++++++++++++++++++++++++++++-- pkg/tui/tui.go | 19 ++++- pkg/userconfig/userconfig.go | 30 +++++++ pkg/userconfig/userconfig_test.go | 25 ++++++ 14 files changed, 347 insertions(+), 38 deletions(-) diff --git a/pkg/app/app.go b/pkg/app/app.go index 7d85c3c1c..c78c49306 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -340,38 +340,45 @@ func (a *App) Run(ctx context.Context, cancel context.CancelFunc, message string }() } -// Steer enqueues a user message for mid-turn injection into the running agent -// loop. The agent will see the message at the next tool-round boundary. Returns -// an error if the steer queue is full. Attachments are processed into +// buildQueuedMessage processes a text/attachment pair into a runtime.QueuedMessage +// suitable for either steering or follow-up. Attachments are rendered into // MultiContent parts so the model can see images/PDFs alongside the text. -func (a *App) Steer(content string, attachments []messages.Attachment) error { +func (a *App) buildQueuedMessage(content string, attachments []messages.Attachment) runtime.QueuedMessage { msg := runtime.QueuedMessage{Content: content} - if len(attachments) > 0 { - ctx := context.Background() - var textBuilder strings.Builder - textBuilder.WriteString(content) + if len(attachments) == 0 { + return msg + } - var binaryParts []chat.MessagePart + ctx := context.Background() + var textBuilder strings.Builder + textBuilder.WriteString(content) - for _, att := range attachments { - switch { - case att.FilePath != "": - a.processFileAttachment(ctx, att, &textBuilder, &binaryParts) - case att.Content != "": - a.processInlineAttachment(att, &textBuilder) - default: - slog.Debug("skipping attachment with no file path or content", "name", att.Name) - } - } + var binaryParts []chat.MessagePart - msg.Content = textBuilder.String() - if len(binaryParts) > 0 { - msg.MultiContent = binaryParts + for _, att := range attachments { + switch { + case att.FilePath != "": + a.processFileAttachment(ctx, att, &textBuilder, &binaryParts) + case att.Content != "": + a.processInlineAttachment(att, &textBuilder) + default: + slog.Debug("skipping attachment with no file path or content", "name", att.Name) } } - return a.runtime.Steer(msg) + msg.Content = textBuilder.String() + if len(binaryParts) > 0 { + msg.MultiContent = binaryParts + } + return msg +} + +// Steer enqueues a user message for mid-turn injection into the running agent +// loop. The agent will see the message at the next tool-round boundary. Returns +// an error if the steer queue is full. +func (a *App) Steer(content string, attachments []messages.Attachment) error { + return a.runtime.Steer(a.buildQueuedMessage(content, attachments)) } // ClearSteerQueue drains all pending messages from the runtime's steer queue. @@ -379,6 +386,18 @@ func (a *App) ClearSteerQueue() { a.runtime.ClearSteerQueue() } +// FollowUp enqueues a user message for end-of-turn processing. Each follow-up +// gets a full undivided agent turn after the current turn completes. Returns +// an error if the follow-up queue is full. +func (a *App) FollowUp(content string, attachments []messages.Attachment) error { + return a.runtime.FollowUp(a.buildQueuedMessage(content, attachments)) +} + +// ClearFollowUpQueue drains all pending messages from the runtime's follow-up queue. +func (a *App) ClearFollowUpQueue() { + a.runtime.ClearFollowUpQueue() +} + // processFileAttachment reads a file from disk, classifies it, and either // appends its text content to textBuilder or adds a binary part to binaryParts. func (a *App) processFileAttachment(ctx context.Context, att messages.Attachment, textBuilder *strings.Builder, binaryParts *[]chat.MessagePart) { diff --git a/pkg/app/app_test.go b/pkg/app/app_test.go index 5c6255344..1e40c3bb7 100644 --- a/pkg/app/app_test.go +++ b/pkg/app/app_test.go @@ -70,6 +70,7 @@ func (m *mockRuntime) Stop() {} func (m *mockRuntime) Steer(_ runtime.QueuedMessage) error { return nil } func (m *mockRuntime) ClearSteerQueue() {} func (m *mockRuntime) FollowUp(_ runtime.QueuedMessage) error { return nil } +func (m *mockRuntime) ClearFollowUpQueue() {} // Verify mockRuntime implements runtime.Runtime var _ runtime.Runtime = (*mockRuntime)(nil) diff --git a/pkg/cli/runner_test.go b/pkg/cli/runner_test.go index 5983a4dfd..04686cee6 100644 --- a/pkg/cli/runner_test.go +++ b/pkg/cli/runner_test.go @@ -63,6 +63,7 @@ func (m *mockRuntime) Close() error func (m *mockRuntime) Steer(runtime.QueuedMessage) error { return nil } func (m *mockRuntime) ClearSteerQueue() {} func (m *mockRuntime) FollowUp(runtime.QueuedMessage) error { return nil } +func (m *mockRuntime) ClearFollowUpQueue() {} func (m *mockRuntime) RegenerateTitle(context.Context, *session.Session, chan runtime.Event) {} func (m *mockRuntime) Resume(_ context.Context, req runtime.ResumeRequest) { diff --git a/pkg/runtime/commands_test.go b/pkg/runtime/commands_test.go index 32a89cc3b..182ef8e17 100644 --- a/pkg/runtime/commands_test.go +++ b/pkg/runtime/commands_test.go @@ -72,6 +72,7 @@ func (m *mockRuntime) Close() error { return nil } func (m *mockRuntime) Steer(QueuedMessage) error { return nil } func (m *mockRuntime) ClearSteerQueue() {} func (m *mockRuntime) FollowUp(QueuedMessage) error { return nil } +func (m *mockRuntime) ClearFollowUpQueue() {} func (m *mockRuntime) RegenerateTitle(context.Context, *session.Session, chan Event) { } diff --git a/pkg/runtime/remote_runtime.go b/pkg/runtime/remote_runtime.go index 3119ac8a5..f7997bdd5 100644 --- a/pkg/runtime/remote_runtime.go +++ b/pkg/runtime/remote_runtime.go @@ -226,6 +226,10 @@ func (r *RemoteRuntime) Steer(msg QueuedMessage) error { // forwarded to the server and there is no local queue to drain. func (r *RemoteRuntime) ClearSteerQueue() {} +// ClearFollowUpQueue is a no-op for remote runtimes — follow-up messages are +// forwarded to the server and there is no local queue to drain. +func (r *RemoteRuntime) ClearFollowUpQueue() {} + // FollowUp enqueues a message for end-of-turn processing on the remote server. func (r *RemoteRuntime) FollowUp(msg QueuedMessage) error { if r.sessionID == "" { diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index ccb3f20c6..d8b07b51f 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -149,6 +149,9 @@ type Runtime interface { // FollowUp enqueues a message for end-of-turn processing. Each follow-up // gets a full undivided agent turn. Returns an error if the queue is full. FollowUp(msg QueuedMessage) error + // ClearFollowUpQueue drains all pending messages from the follow-up queue, + // discarding them. Used when the user explicitly clears the queue. + ClearFollowUpQueue() // Close releases resources held by the runtime (e.g., session store connections). Close() error @@ -1078,6 +1081,12 @@ func (r *LocalRuntime) FollowUp(msg QueuedMessage) error { return nil } +// ClearFollowUpQueue drains all pending messages from the follow-up queue, +// discarding them. This is safe to call concurrently with the agent loop. +func (r *LocalRuntime) ClearFollowUpQueue() { + r.followUpQueue.Drain(context.Background()) +} + // Run starts the agent's interaction loop func (r *LocalRuntime) startSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { diff --git a/pkg/tui/commands/commands.go b/pkg/tui/commands/commands.go index 17686001f..81e051890 100644 --- a/pkg/tui/commands/commands.go +++ b/pkg/tui/commands/commands.go @@ -313,6 +313,17 @@ func builtInSettingsCommands() []Item { return core.CmdHandler(messages.OpenThemePickerMsg{}) }, }, + { + ID: "settings.followup-behavior", + Label: "Follow-up Behavior", + SlashCommand: "/followup-behavior", + Description: "Set behavior for messages sent while agent is working (usage: /followup-behavior steer|followup)", + Category: "Settings", + Immediate: true, + Execute: func(arg string) tea.Cmd { + return core.CmdHandler(messages.SetFollowupBehaviorMsg{Mode: strings.TrimSpace(arg)}) + }, + }, } } diff --git a/pkg/tui/handlers.go b/pkg/tui/handlers.go index d0585711f..0c765b4d1 100644 --- a/pkg/tui/handlers.go +++ b/pkg/tui/handlers.go @@ -351,6 +351,54 @@ func (m *appModel) handleToggleSplitDiff() (tea.Model, tea.Cmd) { return m, tea.Batch(cmds...) } +// handleSetFollowupBehavior sets the follow-up behavior persistently. When +// called with an empty mode it reports the current value. Invalid modes +// produce a warning notification listing the accepted values. +func (m *appModel) handleSetFollowupBehavior(mode string) (tea.Model, tea.Cmd) { + mode = strings.ToLower(strings.TrimSpace(mode)) + + current := userconfig.Get().GetFollowupBehavior() + if mode == "" { + return m, notification.InfoCmd(fmt.Sprintf( + "Follow-up behavior: %s · usage: /followup-behavior steer|followup", current)) + } + + if mode != userconfig.FollowupBehaviorSteer && mode != userconfig.FollowupBehaviorFollowUp { + return m, notification.WarningCmd(fmt.Sprintf( + "Unknown follow-up behavior %q. Valid values: %s, %s", + mode, userconfig.FollowupBehaviorSteer, userconfig.FollowupBehaviorFollowUp)) + } + + if mode == current { + return m, notification.InfoCmd(fmt.Sprintf("Follow-up behavior already set to %q", mode)) + } + + // Persist to global userconfig (fire-and-forget, matching the split-diff pattern). + go func() { + cfg, err := userconfig.Load() + if err != nil { + slog.Warn("Failed to load userconfig for follow-up behavior change", "error", err) + return + } + if cfg.Settings == nil { + cfg.Settings = &userconfig.Settings{} + } + cfg.Settings.FollowupBehavior = mode + if err := cfg.Save(); err != nil { + slog.Warn("Failed to persist follow-up behavior to userconfig", "error", err) + } + }() + + var description string + switch mode { + case userconfig.FollowupBehaviorSteer: + description = "messages sent while working will be injected mid-turn" + case userconfig.FollowupBehaviorFollowUp: + description = "messages sent while working will queue as their own turn" + } + return m, notification.SuccessCmd(fmt.Sprintf("Follow-up behavior set to %q · %s", mode, description)) +} + // --- Dialogs --- func (m *appModel) handleShowCostDialog() (tea.Model, tea.Cmd) { diff --git a/pkg/tui/messages/session.go b/pkg/tui/messages/session.go index 8e4a0c405..7c5ad097f 100644 --- a/pkg/tui/messages/session.go +++ b/pkg/tui/messages/session.go @@ -80,6 +80,12 @@ type ( // ToggleSplitDiffMsg toggles split diff view mode. ToggleSplitDiffMsg struct{} + // SetFollowupBehaviorMsg sets the follow-up behavior (how messages + // submitted while the agent is working are dispatched). Mode is the raw + // user-supplied value; empty string means "show current value". Unknown + // values are rejected by the handler. + SetFollowupBehaviorMsg struct{ Mode string } + // SendMsg contains the content sent to the agent. SendMsg struct { Content string // Full content sent to the agent (with file contents expanded) diff --git a/pkg/tui/page/chat/chat.go b/pkg/tui/page/chat/chat.go index 920037b19..efcc83380 100644 --- a/pkg/tui/page/chat/chat.go +++ b/pkg/tui/page/chat/chat.go @@ -26,6 +26,7 @@ import ( msgtypes "github.com/docker/docker-agent/pkg/tui/messages" "github.com/docker/docker-agent/pkg/tui/service" "github.com/docker/docker-agent/pkg/tui/styles" + "github.com/docker/docker-agent/pkg/userconfig" ) const ( @@ -684,10 +685,27 @@ func (p *chatPage) handleSendMsg(msg msgtypes.SendMsg) (layout.Model, tea.Cmd) { return p, cmd } - // Steer the message into the running agent loop. The runtime injects it - // at the next tool-round boundary so the model sees it mid-turn. - if err := p.app.Steer(msg.Content, msg.Attachments); err != nil { - return p, notification.WarningCmd("Steer queue full (max 5). Please wait for the agent to catch up.") + // Dispatch the message according to the configured follow-up behavior. + // "steer" (default) injects it mid-turn via at the next + // tool-round boundary. "followup" enqueues it as its own undivided turn + // after the current one completes. + var ( + dispatchErr error + errNotice string + okNotice string + ) + switch userconfig.Get().GetFollowupBehavior() { + case userconfig.FollowupBehaviorFollowUp: + dispatchErr = p.app.FollowUp(msg.Content, msg.Attachments) + errNotice = "Follow-up queue full. Please wait for the agent to catch up." + okNotice = "Message queued · will run as its own turn" + default: + dispatchErr = p.app.Steer(msg.Content, msg.Attachments) + errNotice = "Steer queue full (max 5). Please wait for the agent to catch up." + okNotice = "Message steered · agent will see it at the next step" + } + if dispatchErr != nil { + return p, notification.WarningCmd(errNotice) } // Track for sidebar display; cleared when the stream stops. @@ -697,7 +715,7 @@ func (p *chatPage) handleSendMsg(msg msgtypes.SendMsg) (layout.Model, tea.Cmd) { }) p.syncQueueToSidebar() - return p, notification.InfoCmd("Message steered · agent will see it at the next step") + return p, notification.InfoCmd(okNotice) } func (p *chatPage) handleEditUserMessage(msg msgtypes.EditUserMessageMsg) (layout.Model, tea.Cmd) { @@ -821,8 +839,10 @@ func (p *chatPage) extractAttachmentsFromSession(position int) []msgtypes.Attach return attachments } -// handleClearQueue clears both the display queue and the runtime's steer -// queue so no pending messages are injected into the agent loop. +// handleClearQueue clears the display queue and both runtime queues (steer and +// follow-up) so no pending messages are injected into the agent loop. Both are +// drained unconditionally because the user may have switched modes after +// queueing, or a session may hold pending messages of either kind. func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) { count := len(p.messageQueue) if count == 0 { @@ -832,6 +852,7 @@ func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) { p.messageQueue = nil p.syncQueueToSidebar() p.app.ClearSteerQueue() + p.app.ClearFollowUpQueue() var msg string if count == 1 { diff --git a/pkg/tui/page/chat/queue_test.go b/pkg/tui/page/chat/queue_test.go index c8cab346f..2522e6a2d 100644 --- a/pkg/tui/page/chat/queue_test.go +++ b/pkg/tui/page/chat/queue_test.go @@ -18,13 +18,17 @@ import ( "github.com/docker/docker-agent/pkg/tui/components/sidebar" "github.com/docker/docker-agent/pkg/tui/messages" "github.com/docker/docker-agent/pkg/tui/service" + "github.com/docker/docker-agent/pkg/userconfig" ) -// steerRuntime is a minimal runtime.Runtime for testing steer behaviour. +// steerRuntime is a minimal runtime.Runtime for testing steer/follow-up behaviour. type steerRuntime struct { - steered []runtime.QueuedMessage - steerFn func(runtime.QueuedMessage) error // optional override - steerCleared int // number of ClearSteerQueue calls + steered []runtime.QueuedMessage + steerFn func(runtime.QueuedMessage) error // optional override + steerCleared int // number of ClearSteerQueue calls + followedUp []runtime.QueuedMessage + followUpFn func(runtime.QueuedMessage) error // optional override + followUpCleared int // number of ClearFollowUpQueue calls } func (r *steerRuntime) Steer(msg runtime.QueuedMessage) error { @@ -99,7 +103,17 @@ func (r *steerRuntime) TitleGenerator() *sessiontitle.Generator { return nil } func (r *steerRuntime) Close() error { return nil } -func (r *steerRuntime) FollowUp(runtime.QueuedMessage) error { return nil } +func (r *steerRuntime) FollowUp(msg runtime.QueuedMessage) error { + if r.followUpFn != nil { + return r.followUpFn(msg) + } + r.followedUp = append(r.followedUp, msg) + return nil +} + +func (r *steerRuntime) ClearFollowUpQueue() { + r.followUpCleared++ +} func (r *steerRuntime) RegenerateTitle(context.Context, *session.Session, chan runtime.Event) {} @@ -237,3 +251,109 @@ func TestSteer_IdleAgent_ProcessesImmediately(t *testing.T) { _ = messages.SendMsg{Content: "hello"} assert.Empty(t, rt.steered) } + +// setFollowupBehavior writes the global follow-up behavior to an isolated +// HOME-rooted config for the duration of a test. Not parallel-safe because it +// mutates HOME via t.Setenv. +func setFollowupBehavior(t *testing.T, behavior string) { + t.Helper() + home := t.TempDir() + t.Setenv("HOME", home) + + cfg, err := userconfig.Load() + require.NoError(t, err) + if cfg.Settings == nil { + cfg.Settings = &userconfig.Settings{} + } + cfg.Settings.FollowupBehavior = behavior + require.NoError(t, cfg.Save()) +} + +func TestFollowUp_BusyAgent_EnqueuesAsFollowUp(t *testing.T) { + setFollowupBehavior(t, userconfig.FollowupBehaviorFollowUp) + + p, rt := newTestChatPage(t) + + // Send while busy in follow-up mode — should call FollowUp, not Steer. + _, cmd := p.handleSendMsg(messages.SendMsg{Content: "first"}) + assert.NotNil(t, cmd) + + assert.Empty(t, rt.steered, "steer path should not be used in follow-up mode") + require.Len(t, rt.followedUp, 1) + assert.Equal(t, "first", rt.followedUp[0].Content) + + // Display queue should still track messages for the sidebar. + require.Len(t, p.messageQueue, 1) + assert.Equal(t, "first", p.messageQueue[0].content) + + // Second message also follows the follow-up path. + _, _ = p.handleSendMsg(messages.SendMsg{Content: "second"}) + require.Len(t, rt.followedUp, 2) + assert.Empty(t, rt.steered) +} + +func TestFollowUp_QueueFull_RejectsMessage(t *testing.T) { + setFollowupBehavior(t, userconfig.FollowupBehaviorFollowUp) + + p, rt := newTestChatPage(t) + + rt.followUpFn = func(msg runtime.QueuedMessage) error { + if len(rt.followedUp) >= 2 { + return errors.New("follow-up queue full") + } + rt.followedUp = append(rt.followedUp, msg) + return nil + } + + // First two succeed. + for i := range 2 { + _, _ = p.handleSendMsg(messages.SendMsg{Content: "message"}) + assert.Len(t, rt.followedUp, i+1) + } + + // Third is rejected. + _, cmd := p.handleSendMsg(messages.SendMsg{Content: "overflow"}) + assert.NotNil(t, cmd) // warning notification + assert.Len(t, rt.followedUp, 2) + // Display queue should not grow on rejection. + assert.Len(t, p.messageQueue, 2) +} + +func TestClearQueue_DrainsBothRuntimeQueues(t *testing.T) { + t.Parallel() + + p, rt := newTestChatPage(t) + + // Prime the display queue by sending two messages in (default) steer mode. + p.handleSendMsg(messages.SendMsg{Content: "first"}) + p.handleSendMsg(messages.SendMsg{Content: "second"}) + require.Len(t, p.messageQueue, 2) + + // Clear should drain both runtime queues unconditionally so switching + // modes after queueing does not leak messages. + _, cmd := p.handleClearQueue() + assert.Empty(t, p.messageQueue) + assert.NotNil(t, cmd) + assert.Equal(t, 1, rt.steerCleared) + assert.Equal(t, 1, rt.followUpCleared) +} + +func TestFollowUp_BusyAgent_PassesAttachments(t *testing.T) { + setFollowupBehavior(t, userconfig.FollowupBehaviorFollowUp) + + p, rt := newTestChatPage(t) + + msg := messages.SendMsg{ + Content: "check this", + Attachments: []messages.Attachment{ + {Name: "paste-1", Content: "some pasted text"}, + }, + } + _, cmd := p.handleSendMsg(msg) + assert.NotNil(t, cmd) + + require.Len(t, rt.followedUp, 1) + assert.Contains(t, rt.followedUp[0].Content, "check this") + assert.Contains(t, rt.followedUp[0].Content, "some pasted text") + assert.Empty(t, rt.steered) +} diff --git a/pkg/tui/tui.go b/pkg/tui/tui.go index 47446c429..9b390c6c9 100644 --- a/pkg/tui/tui.go +++ b/pkg/tui/tui.go @@ -898,6 +898,9 @@ func (m *appModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { case messages.ToggleSplitDiffMsg: return m.handleToggleSplitDiff() + case messages.SetFollowupBehaviorMsg: + return m.handleSetFollowupBehavior(msg.Mode) + case messages.ClearQueueMsg: updated, cmd := m.chatPage.Update(msg) m.chatPage = updated.(chat.Page) @@ -2196,12 +2199,22 @@ func (m *appModel) handleEditorResize(y int) tea.Cmd { return nil } +// queueLabelForMode returns the word shown after a queue count in the working +// indicator. Matches the active follow-up behavior so it's truthful about +// what's actually happening to pending messages. +func queueLabelForMode() string { + if userconfig.Get().GetFollowupBehavior() == userconfig.FollowupBehaviorFollowUp { + return "queued" + } + return "steered" +} + // renderLeanWorkingIndicator renders a single-line working indicator for lean mode. func (m *appModel) renderLeanWorkingIndicator() string { innerWidth := m.width - appPaddingHorizontal workingText := "Working\u2026" if queueLen := m.chatPage.QueueLength(); queueLen > 0 { - workingText = fmt.Sprintf("Working\u2026 (%d steered)", queueLen) + workingText = fmt.Sprintf("Working\u2026 (%d %s)", queueLen, queueLabelForMode()) } line := m.workingSpinner.View() + " " + styles.SpinnerDotsHighlightStyle.Render(workingText) return lipgloss.NewStyle().Padding(0, styles.AppPadding).Width(innerWidth + appPaddingHorizontal).Render(line) @@ -2238,7 +2251,7 @@ func (m *appModel) renderResizeHandle(width int) string { // Truncate right side and append spinner (handle stays centered) workingText := "Working…" if queueLen := m.chatPage.QueueLength(); queueLen > 0 { - workingText = fmt.Sprintf("Working… (%d steered)", queueLen) + workingText = fmt.Sprintf("Working… (%d %s)", queueLen, queueLabelForMode()) } suffix := " " + m.workingSpinner.View() + " " + styles.SpinnerDotsHighlightStyle.Render(workingText) cancelKeyPart := styles.HighlightWhiteStyle.Render("Esc") @@ -2247,7 +2260,7 @@ func (m *appModel) renderResizeHandle(width int) string { result = lipgloss.NewStyle().MaxWidth(innerWidth-suffixWidth).Render(fullLine) + suffix case m.chatPage.QueueLength() > 0: - queueText := fmt.Sprintf("%d steered", m.chatPage.QueueLength()) + queueText := fmt.Sprintf("%d %s", m.chatPage.QueueLength(), queueLabelForMode()) suffix := " " + styles.WarningStyle.Render(queueText) + " " suffixWidth := lipgloss.Width(suffix) result = lipgloss.NewStyle().MaxWidth(innerWidth-suffixWidth).Render(fullLine) + suffix diff --git a/pkg/userconfig/userconfig.go b/pkg/userconfig/userconfig.go index b1b62e3bf..802e6a092 100644 --- a/pkg/userconfig/userconfig.go +++ b/pkg/userconfig/userconfig.go @@ -61,12 +61,28 @@ type Settings struct { // SoundThreshold is the minimum duration in seconds a task must run // before a success sound is played. Defaults to 5 seconds. SoundThreshold int `yaml:"sound_threshold,omitempty"` + // FollowupBehavior controls what happens when the user sends a message + // while the agent is still working. Valid values are "steer" (inject the + // message mid-turn via at the next tool-round boundary) + // and "followup" (enqueue the message as its own undivided turn after the + // current turn completes). Defaults to "steer". + FollowupBehavior string `yaml:"followup_behavior,omitempty"` // Permissions defines global permission patterns applied across all sessions // and agents. These act as user-wide defaults; session-level and agent-level // permissions override them. Permissions *latest.PermissionsConfig `yaml:"permissions,omitempty"` } +// FollowupBehavior values accepted by Settings.FollowupBehavior. +const ( + // FollowupBehaviorSteer injects messages mid-turn via runtime.Steer at the + // next tool-round boundary. This is the default. + FollowupBehaviorSteer = "steer" + // FollowupBehaviorFollowUp enqueues messages via runtime.FollowUp. Each + // queued message gets its own undivided turn after the current one completes. + FollowupBehaviorFollowUp = "followup" +) + // DefaultTabTitleMaxLength is the default maximum tab title length when not configured. const DefaultTabTitleMaxLength = 20 @@ -89,6 +105,20 @@ func (s *Settings) GetSound() bool { return s.Sound } +// GetFollowupBehavior returns the configured follow-up behavior, normalizing +// unset or invalid values to FollowupBehaviorSteer (the default). +func (s *Settings) GetFollowupBehavior() string { + if s == nil { + return FollowupBehaviorSteer + } + switch s.FollowupBehavior { + case FollowupBehaviorFollowUp: + return FollowupBehaviorFollowUp + default: + return FollowupBehaviorSteer + } +} + // GetSoundThreshold returns the minimum duration for sound notifications, defaulting to 10s. func (s *Settings) GetSoundThreshold() int { if s == nil || s.SoundThreshold <= 0 { diff --git a/pkg/userconfig/userconfig_test.go b/pkg/userconfig/userconfig_test.go index feb82d21a..a6994b7c8 100644 --- a/pkg/userconfig/userconfig_test.go +++ b/pkg/userconfig/userconfig_test.go @@ -890,3 +890,28 @@ func TestConfig_PermissionsRoundTrip(t *testing.T) { assert.Equal(t, original.Settings.Permissions.Deny, loaded.Settings.Permissions.Deny) assert.Equal(t, original.Settings.Permissions.Ask, loaded.Settings.Permissions.Ask) } + +func TestGetFollowupBehavior(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + settings *Settings + want string + }{ + {"nil settings default to steer", nil, FollowupBehaviorSteer}, + {"empty settings default to steer", &Settings{}, FollowupBehaviorSteer}, + {"unset field defaults to steer", &Settings{FollowupBehavior: ""}, FollowupBehaviorSteer}, + {"explicit steer", &Settings{FollowupBehavior: FollowupBehaviorSteer}, FollowupBehaviorSteer}, + {"explicit followup", &Settings{FollowupBehavior: FollowupBehaviorFollowUp}, FollowupBehaviorFollowUp}, + {"typo falls back to steer", &Settings{FollowupBehavior: "steeer"}, FollowupBehaviorSteer}, + {"case-sensitive: FOLLOWUP does not match", &Settings{FollowupBehavior: "FOLLOWUP"}, FollowupBehaviorSteer}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.want, tt.settings.GetFollowupBehavior()) + }) + } +} From 4f823e4ddae7b7932d8b803ce3b3471dc99c9ac0 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Tue, 21 Apr 2026 12:08:22 +0200 Subject: [PATCH 4/4] Drain follow-up queue on cancel and apply followup-behavior changes immediately Three review findings on the FollowupBehavior setting change: - HIGH: `LocalRuntime.RunStream` only drained the steer queue at startup, so a follow-up message enqueued in the previous (cancelled) session survived the cancel boundary and was dequeued as the first action of the next `RunStream` call. Drain both queues on entry. - HIGH: The TUI `StreamCancelledMsg` handler cleared the sidebar display queue but did not call `ClearSteerQueue`/`ClearFollowUpQueue` on the app, so pending messages sat in the runtime queues until the next run. Extract the drain into a `clearPendingQueues` helper and call it from both the explicit clear-queue action and the cancel handler. - MEDIUM: `/followup-behavior` persisted the new mode in a goroutine while `userconfig.Get` reads from disk on every call. Until the write committed, the dispatch switch and working-indicator label still saw the old value. Add `SetFollowupBehaviorOverride` as an in-memory override that `Get` consults, and set it synchronously in the handler before the background save, so the new mode is visible immediately. --- pkg/runtime/loop.go | 7 ++++-- pkg/tui/handlers.go | 7 ++++++ pkg/tui/page/chat/chat.go | 27 ++++++++++++++------- pkg/tui/page/chat/queue_test.go | 26 ++++++++++++++++++++ pkg/userconfig/userconfig.go | 40 +++++++++++++++++++++++++++++-- pkg/userconfig/userconfig_test.go | 40 +++++++++++++++++++++++++++++++ 6 files changed, 135 insertions(+), 12 deletions(-) diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index f88eaa5ee..c9b032d40 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -75,9 +75,12 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c events := make(chan Event, 128) go func() { - // Drain any orphaned steer messages from a previous cancelled run - // so they don't leak into this new stream. + // Drain any orphaned messages from a previous cancelled run so + // they don't leak into this new stream. Both queues are drained: + // the user may have switched modes between sessions, or pending + // messages of either kind may have survived a cancel boundary. r.steerQueue.Drain(context.Background()) + r.followUpQueue.Drain(context.Background()) telemetry.RecordSessionStart(ctx, r.CurrentAgentName(), sess.ID) diff --git a/pkg/tui/handlers.go b/pkg/tui/handlers.go index 0c765b4d1..e9939a1c7 100644 --- a/pkg/tui/handlers.go +++ b/pkg/tui/handlers.go @@ -373,6 +373,13 @@ func (m *appModel) handleSetFollowupBehavior(mode string) (tea.Model, tea.Cmd) { return m, notification.InfoCmd(fmt.Sprintf("Follow-up behavior already set to %q", mode)) } + // Apply the change in-memory synchronously so userconfig.Get returns the + // new value immediately. Without this the send-dispatch and + // working-indicator readers below would race the background file write + // and keep returning the old mode until it commits — meaning a message + // sent right after the command could be dispatched under the old mode. + userconfig.SetFollowupBehaviorOverride(mode) + // Persist to global userconfig (fire-and-forget, matching the split-diff pattern). go func() { cfg, err := userconfig.Load() diff --git a/pkg/tui/page/chat/chat.go b/pkg/tui/page/chat/chat.go index efcc83380..3bbf3f830 100644 --- a/pkg/tui/page/chat/chat.go +++ b/pkg/tui/page/chat/chat.go @@ -404,10 +404,13 @@ func (p *chatPage) Update(msg tea.Msg) (layout.Model, tea.Cmd) { } cmds = append(cmds, p.messages.ScrollToBottom()) - // Clear the display-only queue; steered messages that the runtime - // hasn't consumed yet are lost when the stream is cancelled. - p.messageQueue = nil - p.syncQueueToSidebar() + // Clear the display queue and both runtime queues so pending + // messages from the cancelled session can't leak into the next + // one. Follow-up messages in particular survive the cancel + // boundary inside the runtime's in-memory queue, so without an + // explicit drain they would be dequeued and executed as the + // first messages of the next RunStream call. + p.clearPendingQueues() return p, tea.Batch(cmds...) @@ -849,10 +852,7 @@ func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) { return p, notification.InfoCmd("No messages queued") } - p.messageQueue = nil - p.syncQueueToSidebar() - p.app.ClearSteerQueue() - p.app.ClearFollowUpQueue() + p.clearPendingQueues() var msg string if count == 1 { @@ -863,6 +863,17 @@ func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) { return p, notification.SuccessCmd(msg) } +// clearPendingQueues drops the display queue and drains both runtime +// queues (steer and follow-up). Used by the explicit clear-queue action +// and by the stream-cancel handler so messages left over from a +// cancelled session cannot leak into the next run. +func (p *chatPage) clearPendingQueues() { + p.messageQueue = nil + p.syncQueueToSidebar() + p.app.ClearSteerQueue() + p.app.ClearFollowUpQueue() +} + // syncQueueToSidebar updates the sidebar with truncated previews of queued messages. func (p *chatPage) syncQueueToSidebar() { previews := make([]string, len(p.messageQueue)) diff --git a/pkg/tui/page/chat/queue_test.go b/pkg/tui/page/chat/queue_test.go index 2522e6a2d..d0dd14a8b 100644 --- a/pkg/tui/page/chat/queue_test.go +++ b/pkg/tui/page/chat/queue_test.go @@ -338,6 +338,32 @@ func TestClearQueue_DrainsBothRuntimeQueues(t *testing.T) { assert.Equal(t, 1, rt.followUpCleared) } +// TestClearPendingQueues_StreamCancelled verifies the stream-cancel path +// drains both runtime queues so pending messages from the cancelled +// session cannot leak into the next RunStream call. Without this, a +// follow-up enqueued via FollowUp before Cancel would sit in the +// runtime's in-memory queue and be dequeued as the first action of the +// user's next session. +func TestClearPendingQueues_StreamCancelled(t *testing.T) { + t.Parallel() + + p, rt := newTestChatPage(t) + + // Simulate: the user dispatched messages while the agent was busy + // and then cancelled the stream. + p.handleSendMsg(messages.SendMsg{Content: "first"}) + p.handleSendMsg(messages.SendMsg{Content: "second"}) + require.Len(t, p.messageQueue, 2) + + p.clearPendingQueues() + + assert.Empty(t, p.messageQueue) + assert.Equal(t, 1, rt.steerCleared, + "cancel path must drain runtime steer queue") + assert.Equal(t, 1, rt.followUpCleared, + "cancel path must drain runtime follow-up queue") +} + func TestFollowUp_BusyAgent_PassesAttachments(t *testing.T) { setFollowupBehavior(t, userconfig.FollowupBehaviorFollowUp) diff --git a/pkg/userconfig/userconfig.go b/pkg/userconfig/userconfig.go index 802e6a092..d1dfffefb 100644 --- a/pkg/userconfig/userconfig.go +++ b/pkg/userconfig/userconfig.go @@ -83,6 +83,32 @@ const ( FollowupBehaviorFollowUp = "followup" ) +// followupBehaviorOverride is an in-memory override applied by Get so that +// runtime changes to FollowupBehavior (via SetFollowupBehaviorOverride) are +// visible immediately, even while the on-disk config is being updated +// asynchronously by a background save. An empty string means no override. +var ( + followupBehaviorOverrideMu sync.RWMutex + followupBehaviorOverride string +) + +// SetFollowupBehaviorOverride sets an in-memory override for the follow-up +// behavior that is returned by subsequent calls to Get. Callers changing the +// setting at runtime should invoke this synchronously before persisting to +// disk so the new value is visible immediately, without racing the +// background file write. Pass an empty string to clear the override. +func SetFollowupBehaviorOverride(mode string) { + followupBehaviorOverrideMu.Lock() + followupBehaviorOverride = mode + followupBehaviorOverrideMu.Unlock() +} + +func getFollowupBehaviorOverride() string { + followupBehaviorOverrideMu.RLock() + defer followupBehaviorOverrideMu.RUnlock() + return followupBehaviorOverride +} + // DefaultTabTitleMaxLength is the default maximum tab title length when not configured. const DefaultTabTitleMaxLength = 20 @@ -371,10 +397,20 @@ func (c *Config) GetSettings() *Settings { // Get returns the global user settings from the config file. // Returns an empty Settings if the config file doesn't exist or has no settings. +// +// Any in-memory override set via SetFollowupBehaviorOverride is applied on +// top of the on-disk value so runtime changes are visible immediately, even +// when the disk write is still in flight. func Get() *Settings { + var s *Settings cfg, err := Load() if err != nil { - return &Settings{} + s = &Settings{} + } else { + s = cfg.GetSettings() + } + if ov := getFollowupBehaviorOverride(); ov != "" { + s.FollowupBehavior = ov } - return cfg.GetSettings() + return s } diff --git a/pkg/userconfig/userconfig_test.go b/pkg/userconfig/userconfig_test.go index a6994b7c8..66c7b300c 100644 --- a/pkg/userconfig/userconfig_test.go +++ b/pkg/userconfig/userconfig_test.go @@ -915,3 +915,43 @@ func TestGetFollowupBehavior(t *testing.T) { }) } } + +// TestSetFollowupBehaviorOverride verifies that setting the in-memory override +// makes Get return the new value without waiting for a disk write. This is +// the guarantee /followup-behavior relies on so messages sent right after +// the command are dispatched under the new mode instead of racing the +// asynchronous file save. +// +// Cannot run in parallel because it mutates the package-level override and +// HOME (via the test's config isolation). +func TestSetFollowupBehaviorOverride(t *testing.T) { + home := t.TempDir() + t.Setenv("HOME", home) + + // Ensure a clean override at both the start and end of the test so it + // doesn't leak into other non-parallel tests. + SetFollowupBehaviorOverride("") + t.Cleanup(func() { SetFollowupBehaviorOverride("") }) + + // Baseline: no config, no override -> default steer. + assert.Equal(t, FollowupBehaviorSteer, Get().GetFollowupBehavior()) + + // Override to followup without touching disk — Get should return it. + SetFollowupBehaviorOverride(FollowupBehaviorFollowUp) + assert.Equal(t, FollowupBehaviorFollowUp, Get().GetFollowupBehavior()) + + // On-disk value set to steer, but override still wins. + cfg, err := Load() + require.NoError(t, err) + if cfg.Settings == nil { + cfg.Settings = &Settings{} + } + cfg.Settings.FollowupBehavior = FollowupBehaviorSteer + require.NoError(t, cfg.Save()) + assert.Equal(t, FollowupBehaviorFollowUp, Get().GetFollowupBehavior(), + "override should take precedence over on-disk value") + + // Clearing the override returns Get to reading from disk. + SetFollowupBehaviorOverride("") + assert.Equal(t, FollowupBehaviorSteer, Get().GetFollowupBehavior()) +}