diff --git a/pkg/app/app.go b/pkg/app/app.go index bd0637fec..c78c49306 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -340,6 +340,64 @@ func (a *App) Run(ctx context.Context, cancel context.CancelFunc, message string }() } +// buildQueuedMessage processes a text/attachment pair into a runtime.QueuedMessage +// suitable for either steering or follow-up. Attachments are rendered into +// MultiContent parts so the model can see images/PDFs alongside the text. +func (a *App) buildQueuedMessage(content string, attachments []messages.Attachment) runtime.QueuedMessage { + msg := runtime.QueuedMessage{Content: content} + + if len(attachments) == 0 { + return msg + } + + ctx := context.Background() + var textBuilder strings.Builder + textBuilder.WriteString(content) + + var binaryParts []chat.MessagePart + + for _, att := range attachments { + switch { + case att.FilePath != "": + a.processFileAttachment(ctx, att, &textBuilder, &binaryParts) + case att.Content != "": + a.processInlineAttachment(att, &textBuilder) + default: + slog.Debug("skipping attachment with no file path or content", "name", att.Name) + } + } + + msg.Content = textBuilder.String() + if len(binaryParts) > 0 { + msg.MultiContent = binaryParts + } + return msg +} + +// Steer enqueues a user message for mid-turn injection into the running agent +// loop. The agent will see the message at the next tool-round boundary. Returns +// an error if the steer queue is full. +func (a *App) Steer(content string, attachments []messages.Attachment) error { + return a.runtime.Steer(a.buildQueuedMessage(content, attachments)) +} + +// ClearSteerQueue drains all pending messages from the runtime's steer queue. +func (a *App) ClearSteerQueue() { + a.runtime.ClearSteerQueue() +} + +// FollowUp enqueues a user message for end-of-turn processing. Each follow-up +// gets a full undivided agent turn after the current turn completes. Returns +// an error if the follow-up queue is full. +func (a *App) FollowUp(content string, attachments []messages.Attachment) error { + return a.runtime.FollowUp(a.buildQueuedMessage(content, attachments)) +} + +// ClearFollowUpQueue drains all pending messages from the runtime's follow-up queue. +func (a *App) ClearFollowUpQueue() { + a.runtime.ClearFollowUpQueue() +} + // processFileAttachment reads a file from disk, classifies it, and either // appends its text content to textBuilder or adds a binary part to binaryParts. func (a *App) processFileAttachment(ctx context.Context, att messages.Attachment, textBuilder *strings.Builder, binaryParts *[]chat.MessagePart) { diff --git a/pkg/app/app_test.go b/pkg/app/app_test.go index 2f32cff20..1e40c3bb7 100644 --- a/pkg/app/app_test.go +++ b/pkg/app/app_test.go @@ -68,7 +68,9 @@ func (m *mockRuntime) TitleGenerator() *sessiontitle.Generator { return nil } func (m *mockRuntime) Close() error { return nil } func (m *mockRuntime) Stop() {} func (m *mockRuntime) Steer(_ runtime.QueuedMessage) error { return nil } +func (m *mockRuntime) ClearSteerQueue() {} func (m *mockRuntime) FollowUp(_ runtime.QueuedMessage) error { return nil } +func (m *mockRuntime) ClearFollowUpQueue() {} // Verify mockRuntime implements runtime.Runtime var _ runtime.Runtime = (*mockRuntime)(nil) diff --git a/pkg/cli/runner_test.go b/pkg/cli/runner_test.go index 4f39c1d04..04686cee6 100644 --- a/pkg/cli/runner_test.go +++ b/pkg/cli/runner_test.go @@ -61,7 +61,9 @@ func (m *mockRuntime) UpdateSessionTitle(context.Context, *session.Session, stri func (m *mockRuntime) TitleGenerator() *sessiontitle.Generator { return nil } func (m *mockRuntime) Close() error { return nil } func (m *mockRuntime) Steer(runtime.QueuedMessage) error { return nil } +func (m *mockRuntime) ClearSteerQueue() {} func (m *mockRuntime) FollowUp(runtime.QueuedMessage) error { return nil } +func (m *mockRuntime) ClearFollowUpQueue() {} func (m *mockRuntime) RegenerateTitle(context.Context, *session.Session, chan runtime.Event) {} func (m *mockRuntime) Resume(_ context.Context, req runtime.ResumeRequest) { diff --git a/pkg/runtime/commands_test.go b/pkg/runtime/commands_test.go index 00e4a2195..182ef8e17 100644 --- a/pkg/runtime/commands_test.go +++ b/pkg/runtime/commands_test.go @@ -70,7 +70,9 @@ func (m *mockRuntime) UpdateSessionTitle(context.Context, *session.Session, stri func (m *mockRuntime) TitleGenerator() *sessiontitle.Generator { return nil } func (m *mockRuntime) Close() error { return nil } func (m *mockRuntime) Steer(QueuedMessage) error { return nil } +func (m *mockRuntime) ClearSteerQueue() {} func (m *mockRuntime) FollowUp(QueuedMessage) error { return nil } +func (m *mockRuntime) ClearFollowUpQueue() {} func (m *mockRuntime) RegenerateTitle(context.Context, *session.Session, chan Event) { } diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index bebcc0d7f..c9b032d40 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -75,6 +75,13 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c events := make(chan Event, 128) go func() { + // Drain any orphaned messages from a previous cancelled run so + // they don't leak into this new stream. Both queues are drained: + // the user may have switched modes between sessions, or pending + // messages of either kind may have survived a cancel boundary. + r.steerQueue.Drain(context.Background()) + r.followUpQueue.Drain(context.Background()) + telemetry.RecordSessionStart(ctx, r.CurrentAgentName(), sess.ID) ctx, sessionSpan := r.startSpan(ctx, "runtime.session", trace.WithAttributes( diff --git a/pkg/runtime/remote_runtime.go b/pkg/runtime/remote_runtime.go index 607397016..f7997bdd5 100644 --- a/pkg/runtime/remote_runtime.go +++ b/pkg/runtime/remote_runtime.go @@ -222,6 +222,14 @@ func (r *RemoteRuntime) Steer(msg QueuedMessage) error { }) } +// ClearSteerQueue is a no-op for remote runtimes — steered messages are +// forwarded to the server and there is no local queue to drain. +func (r *RemoteRuntime) ClearSteerQueue() {} + +// ClearFollowUpQueue is a no-op for remote runtimes — follow-up messages are +// forwarded to the server and there is no local queue to drain. +func (r *RemoteRuntime) ClearFollowUpQueue() {} + // FollowUp enqueues a message for end-of-turn processing on the remote server. func (r *RemoteRuntime) FollowUp(msg QueuedMessage) error { if r.sessionID == "" { diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 711607615..d8b07b51f 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -143,9 +143,15 @@ type Runtime interface { // running agent loop. Returns an error if the queue is full or steering // is not available. Steer(msg QueuedMessage) error + // ClearSteerQueue drains all pending messages from the steer queue, + // discarding them. Used when the user explicitly clears the queue. + ClearSteerQueue() // FollowUp enqueues a message for end-of-turn processing. Each follow-up // gets a full undivided agent turn. Returns an error if the queue is full. FollowUp(msg QueuedMessage) error + // ClearFollowUpQueue drains all pending messages from the follow-up queue, + // discarding them. Used when the user explicitly clears the queue. + ClearFollowUpQueue() // Close releases resources held by the runtime (e.g., session store connections). Close() error @@ -1059,6 +1065,12 @@ func (r *LocalRuntime) Steer(msg QueuedMessage) error { return nil } +// ClearSteerQueue drains all pending messages from the steer queue, +// discarding them. This is safe to call concurrently with the agent loop. +func (r *LocalRuntime) ClearSteerQueue() { + r.steerQueue.Drain(context.Background()) +} + // FollowUp enqueues a message to be processed after the current agent turn // finishes. Unlike Steer, follow-ups are popped one at a time and each gets // a full undivided agent turn. @@ -1069,6 +1081,12 @@ func (r *LocalRuntime) FollowUp(msg QueuedMessage) error { return nil } +// ClearFollowUpQueue drains all pending messages from the follow-up queue, +// discarding them. This is safe to call concurrently with the agent loop. +func (r *LocalRuntime) ClearFollowUpQueue() { + r.followUpQueue.Drain(context.Background()) +} + // Run starts the agent's interaction loop func (r *LocalRuntime) startSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { diff --git a/pkg/tui/commands/commands.go b/pkg/tui/commands/commands.go index 17686001f..81e051890 100644 --- a/pkg/tui/commands/commands.go +++ b/pkg/tui/commands/commands.go @@ -313,6 +313,17 @@ func builtInSettingsCommands() []Item { return core.CmdHandler(messages.OpenThemePickerMsg{}) }, }, + { + ID: "settings.followup-behavior", + Label: "Follow-up Behavior", + SlashCommand: "/followup-behavior", + Description: "Set behavior for messages sent while agent is working (usage: /followup-behavior steer|followup)", + Category: "Settings", + Immediate: true, + Execute: func(arg string) tea.Cmd { + return core.CmdHandler(messages.SetFollowupBehaviorMsg{Mode: strings.TrimSpace(arg)}) + }, + }, } } diff --git a/pkg/tui/handlers.go b/pkg/tui/handlers.go index d0585711f..e9939a1c7 100644 --- a/pkg/tui/handlers.go +++ b/pkg/tui/handlers.go @@ -351,6 +351,61 @@ func (m *appModel) handleToggleSplitDiff() (tea.Model, tea.Cmd) { return m, tea.Batch(cmds...) } +// handleSetFollowupBehavior sets the follow-up behavior persistently. When +// called with an empty mode it reports the current value. Invalid modes +// produce a warning notification listing the accepted values. +func (m *appModel) handleSetFollowupBehavior(mode string) (tea.Model, tea.Cmd) { + mode = strings.ToLower(strings.TrimSpace(mode)) + + current := userconfig.Get().GetFollowupBehavior() + if mode == "" { + return m, notification.InfoCmd(fmt.Sprintf( + "Follow-up behavior: %s · usage: /followup-behavior steer|followup", current)) + } + + if mode != userconfig.FollowupBehaviorSteer && mode != userconfig.FollowupBehaviorFollowUp { + return m, notification.WarningCmd(fmt.Sprintf( + "Unknown follow-up behavior %q. Valid values: %s, %s", + mode, userconfig.FollowupBehaviorSteer, userconfig.FollowupBehaviorFollowUp)) + } + + if mode == current { + return m, notification.InfoCmd(fmt.Sprintf("Follow-up behavior already set to %q", mode)) + } + + // Apply the change in-memory synchronously so userconfig.Get returns the + // new value immediately. Without this the send-dispatch and + // working-indicator readers below would race the background file write + // and keep returning the old mode until it commits — meaning a message + // sent right after the command could be dispatched under the old mode. + userconfig.SetFollowupBehaviorOverride(mode) + + // Persist to global userconfig (fire-and-forget, matching the split-diff pattern). + go func() { + cfg, err := userconfig.Load() + if err != nil { + slog.Warn("Failed to load userconfig for follow-up behavior change", "error", err) + return + } + if cfg.Settings == nil { + cfg.Settings = &userconfig.Settings{} + } + cfg.Settings.FollowupBehavior = mode + if err := cfg.Save(); err != nil { + slog.Warn("Failed to persist follow-up behavior to userconfig", "error", err) + } + }() + + var description string + switch mode { + case userconfig.FollowupBehaviorSteer: + description = "messages sent while working will be injected mid-turn" + case userconfig.FollowupBehaviorFollowUp: + description = "messages sent while working will queue as their own turn" + } + return m, notification.SuccessCmd(fmt.Sprintf("Follow-up behavior set to %q · %s", mode, description)) +} + // --- Dialogs --- func (m *appModel) handleShowCostDialog() (tea.Model, tea.Cmd) { diff --git a/pkg/tui/messages/session.go b/pkg/tui/messages/session.go index 8e4a0c405..7c5ad097f 100644 --- a/pkg/tui/messages/session.go +++ b/pkg/tui/messages/session.go @@ -80,6 +80,12 @@ type ( // ToggleSplitDiffMsg toggles split diff view mode. ToggleSplitDiffMsg struct{} + // SetFollowupBehaviorMsg sets the follow-up behavior (how messages + // submitted while the agent is working are dispatched). Mode is the raw + // user-supplied value; empty string means "show current value". Unknown + // values are rejected by the handler. + SetFollowupBehaviorMsg struct{ Mode string } + // SendMsg contains the content sent to the agent. SendMsg struct { Content string // Full content sent to the agent (with file contents expanded) diff --git a/pkg/tui/page/chat/chat.go b/pkg/tui/page/chat/chat.go index b39d3b1ac..3bbf3f830 100644 --- a/pkg/tui/page/chat/chat.go +++ b/pkg/tui/page/chat/chat.go @@ -26,6 +26,7 @@ import ( msgtypes "github.com/docker/docker-agent/pkg/tui/messages" "github.com/docker/docker-agent/pkg/tui/service" "github.com/docker/docker-agent/pkg/tui/styles" + "github.com/docker/docker-agent/pkg/userconfig" ) const ( @@ -125,9 +126,6 @@ type queuedMessage struct { attachments []msgtypes.Attachment } -// maxQueuedMessages is the maximum number of messages that can be queued -const maxQueuedMessages = 5 - // chatPage implements Page type chatPage struct { width, height int @@ -406,10 +404,13 @@ func (p *chatPage) Update(msg tea.Msg) (layout.Model, tea.Cmd) { } cmds = append(cmds, p.messages.ScrollToBottom()) - // Process next queued message after cancel (queue is preserved) - if queueCmd := p.processNextQueuedMessage(); queueCmd != nil { - cmds = append(cmds, queueCmd) - } + // Clear the display queue and both runtime queues so pending + // messages from the cancelled session can't leak into the next + // one. Follow-up messages in particular survive the cancel + // boundary inside the runtime's in-memory queue, so without an + // explicit drain they would be dequeued and executed as the + // first messages of the next RunStream call. + p.clearPendingQueues() return p, tea.Batch(cmds...) @@ -687,22 +688,37 @@ func (p *chatPage) handleSendMsg(msg msgtypes.SendMsg) (layout.Model, tea.Cmd) { return p, cmd } - // If queue is full, reject the message - if len(p.messageQueue) >= maxQueuedMessages { - return p, notification.WarningCmd(fmt.Sprintf("Queue full (max %d messages). Please wait.", maxQueuedMessages)) + // Dispatch the message according to the configured follow-up behavior. + // "steer" (default) injects it mid-turn via at the next + // tool-round boundary. "followup" enqueues it as its own undivided turn + // after the current one completes. + var ( + dispatchErr error + errNotice string + okNotice string + ) + switch userconfig.Get().GetFollowupBehavior() { + case userconfig.FollowupBehaviorFollowUp: + dispatchErr = p.app.FollowUp(msg.Content, msg.Attachments) + errNotice = "Follow-up queue full. Please wait for the agent to catch up." + okNotice = "Message queued · will run as its own turn" + default: + dispatchErr = p.app.Steer(msg.Content, msg.Attachments) + errNotice = "Steer queue full (max 5). Please wait for the agent to catch up." + okNotice = "Message steered · agent will see it at the next step" + } + if dispatchErr != nil { + return p, notification.WarningCmd(errNotice) } - // Add to queue + // Track for sidebar display; cleared when the stream stops. p.messageQueue = append(p.messageQueue, queuedMessage{ content: msg.Content, attachments: msg.Attachments, }) p.syncQueueToSidebar() - queueLen := len(p.messageQueue) - notifyMsg := fmt.Sprintf("Message queued (%d waiting) · Ctrl+X to clear", queueLen) - - return p, notification.InfoCmd(notifyMsg) + return p, notification.InfoCmd(okNotice) } func (p *chatPage) handleEditUserMessage(msg msgtypes.EditUserMessageMsg) (layout.Model, tea.Cmd) { @@ -826,36 +842,17 @@ func (p *chatPage) extractAttachmentsFromSession(position int) []msgtypes.Attach return attachments } -// processNextQueuedMessage pops the next message from the queue and processes it. -// Returns nil if the queue is empty. -func (p *chatPage) processNextQueuedMessage() tea.Cmd { - if len(p.messageQueue) == 0 { - return nil - } - - // Pop the first message from the queue - queued := p.messageQueue[0] - p.messageQueue[0] = queuedMessage{} // zero out to allow GC - p.messageQueue = p.messageQueue[1:] - p.syncQueueToSidebar() - - msg := msgtypes.SendMsg{ - Content: queued.content, - Attachments: queued.attachments, - } - - return p.processMessage(msg) -} - -// handleClearQueue clears all queued messages and shows a notification. +// handleClearQueue clears the display queue and both runtime queues (steer and +// follow-up) so no pending messages are injected into the agent loop. Both are +// drained unconditionally because the user may have switched modes after +// queueing, or a session may hold pending messages of either kind. func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) { count := len(p.messageQueue) if count == 0 { return p, notification.InfoCmd("No messages queued") } - p.messageQueue = nil - p.syncQueueToSidebar() + p.clearPendingQueues() var msg string if count == 1 { @@ -866,6 +863,17 @@ func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) { return p, notification.SuccessCmd(msg) } +// clearPendingQueues drops the display queue and drains both runtime +// queues (steer and follow-up). Used by the explicit clear-queue action +// and by the stream-cancel handler so messages left over from a +// cancelled session cannot leak into the next run. +func (p *chatPage) clearPendingQueues() { + p.messageQueue = nil + p.syncQueueToSidebar() + p.app.ClearSteerQueue() + p.app.ClearFollowUpQueue() +} + // syncQueueToSidebar updates the sidebar with truncated previews of queued messages. func (p *chatPage) syncQueueToSidebar() { previews := make([]string, len(p.messageQueue)) diff --git a/pkg/tui/page/chat/queue_test.go b/pkg/tui/page/chat/queue_test.go index 5a9ebffd7..d0dd14a8b 100644 --- a/pkg/tui/page/chat/queue_test.go +++ b/pkg/tui/page/chat/queue_test.go @@ -1,146 +1,385 @@ package chat import ( + "context" + "errors" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/docker/docker-agent/pkg/app" + "github.com/docker/docker-agent/pkg/runtime" + "github.com/docker/docker-agent/pkg/session" + "github.com/docker/docker-agent/pkg/sessiontitle" + "github.com/docker/docker-agent/pkg/tools" + "github.com/docker/docker-agent/pkg/tools/builtin" + mcptools "github.com/docker/docker-agent/pkg/tools/mcp" "github.com/docker/docker-agent/pkg/tui/components/sidebar" "github.com/docker/docker-agent/pkg/tui/messages" "github.com/docker/docker-agent/pkg/tui/service" + "github.com/docker/docker-agent/pkg/userconfig" ) -// newTestChatPage creates a minimal chatPage for testing queue behavior. -// Note: This only initializes fields needed for queue testing. -// processMessage cannot be called without full initialization. -func newTestChatPage(t *testing.T) *chatPage { +// steerRuntime is a minimal runtime.Runtime for testing steer/follow-up behaviour. +type steerRuntime struct { + steered []runtime.QueuedMessage + steerFn func(runtime.QueuedMessage) error // optional override + steerCleared int // number of ClearSteerQueue calls + followedUp []runtime.QueuedMessage + followUpFn func(runtime.QueuedMessage) error // optional override + followUpCleared int // number of ClearFollowUpQueue calls +} + +func (r *steerRuntime) Steer(msg runtime.QueuedMessage) error { + if r.steerFn != nil { + return r.steerFn(msg) + } + r.steered = append(r.steered, msg) + return nil +} + +func (r *steerRuntime) ClearSteerQueue() { + r.steerCleared++ +} + +// Remaining interface methods — no-ops for this test. + +func (r *steerRuntime) CurrentAgentName() string { return "test" } + +func (r *steerRuntime) CurrentAgentInfo(context.Context) runtime.CurrentAgentInfo { + return runtime.CurrentAgentInfo{} +} + +func (r *steerRuntime) SetCurrentAgent(string) error { return nil } + +func (r *steerRuntime) CurrentAgentTools(context.Context) ([]tools.Tool, error) { + return nil, nil +} + +func (r *steerRuntime) EmitStartupInfo(_ context.Context, _ *session.Session, _ chan runtime.Event) { + // Do not close the channel — app.New's goroutine defers the close. +} + +func (r *steerRuntime) ResetStartupInfo() {} + +func (r *steerRuntime) RunStream(context.Context, *session.Session) <-chan runtime.Event { + ch := make(chan runtime.Event) + close(ch) + return ch +} + +func (r *steerRuntime) Run(context.Context, *session.Session) ([]session.Message, error) { + return nil, nil +} + +func (r *steerRuntime) Resume(context.Context, runtime.ResumeRequest) {} + +func (r *steerRuntime) ResumeElicitation(context.Context, tools.ElicitationAction, map[string]any) error { + return nil +} + +func (r *steerRuntime) SessionStore() session.Store { return nil } + +func (r *steerRuntime) Summarize(context.Context, *session.Session, string, chan runtime.Event) {} + +func (r *steerRuntime) PermissionsInfo() *runtime.PermissionsInfo { return nil } + +func (r *steerRuntime) CurrentAgentSkillsToolset() *builtin.SkillsToolset { return nil } + +func (r *steerRuntime) CurrentMCPPrompts(context.Context) map[string]mcptools.PromptInfo { + return nil +} + +func (r *steerRuntime) ExecuteMCPPrompt(context.Context, string, map[string]string) (string, error) { + return "", nil +} + +func (r *steerRuntime) UpdateSessionTitle(context.Context, *session.Session, string) error { + return nil +} + +func (r *steerRuntime) TitleGenerator() *sessiontitle.Generator { return nil } + +func (r *steerRuntime) Close() error { return nil } + +func (r *steerRuntime) FollowUp(msg runtime.QueuedMessage) error { + if r.followUpFn != nil { + return r.followUpFn(msg) + } + r.followedUp = append(r.followedUp, msg) + return nil +} + +func (r *steerRuntime) ClearFollowUpQueue() { + r.followUpCleared++ +} + +func (r *steerRuntime) RegenerateTitle(context.Context, *session.Session, chan runtime.Event) {} + +// newTestChatPage creates a minimal chatPage for testing steer/queue behaviour. +func newTestChatPage(t *testing.T) (*chatPage, *steerRuntime) { t.Helper() sessionState := &service.SessionState{} + rt := &steerRuntime{} + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + a := app.New(ctx, rt, session.New()) + return &chatPage{ sidebar: sidebar.New(sessionState), sessionState: sessionState, - working: true, // Start busy so messages get queued - } + working: true, // Start busy so messages get steered + app: a, + }, rt } -func TestQueueFlow_BusyAgent_QueuesMessage(t *testing.T) { +func TestSteer_BusyAgent_SteersMessage(t *testing.T) { t.Parallel() - p := newTestChatPage(t) - // newTestChatPage already sets working=true + p, rt := newTestChatPage(t) - // Send first message while busy + // Send first message while busy — should steer to runtime msg1 := messages.SendMsg{Content: "first message"} _, cmd := p.handleSendMsg(msg1) + assert.NotNil(t, cmd) // notification command - // Should be queued + require.Len(t, rt.steered, 1) + assert.Equal(t, "first message", rt.steered[0].Content) + // Display queue should track the steered message require.Len(t, p.messageQueue, 1) assert.Equal(t, "first message", p.messageQueue[0].content) - // Command should be a notification (not processMessage) - assert.NotNil(t, cmd) - // Send second message while still busy + // Send second message msg2 := messages.SendMsg{Content: "second message"} _, _ = p.handleSendMsg(msg2) + require.Len(t, rt.steered, 2) + assert.Equal(t, "second message", rt.steered[1].Content) require.Len(t, p.messageQueue, 2) - assert.Equal(t, "first message", p.messageQueue[0].content) - assert.Equal(t, "second message", p.messageQueue[1].content) - - // Send third message - msg3 := messages.SendMsg{Content: "third message"} - _, _ = p.handleSendMsg(msg3) - - require.Len(t, p.messageQueue, 3) } -func TestQueueFlow_QueueFull_RejectsMessage(t *testing.T) { +func TestSteer_QueueFull_RejectsMessage(t *testing.T) { t.Parallel() - p := newTestChatPage(t) - // newTestChatPage sets working=true - - // Fill the queue to max - for i := range maxQueuedMessages { - msg := messages.SendMsg{Content: "message"} - _, _ = p.handleSendMsg(msg) - assert.Len(t, p.messageQueue, i+1) + p, rt := newTestChatPage(t) + + // Make the runtime's steer queue reject after the first call + calls := 0 + rt.steerFn = func(msg runtime.QueuedMessage) error { + calls++ + if calls > 3 { + return errors.New("steer queue full") + } + rt.steered = append(rt.steered, msg) + return nil } - require.Len(t, p.messageQueue, maxQueuedMessages) - - // Try to add one more - should be rejected - msg := messages.SendMsg{Content: "overflow message"} - _, cmd := p.handleSendMsg(msg) + // First 3 messages succeed + for i := range 3 { + _, _ = p.handleSendMsg(messages.SendMsg{Content: "message"}) + assert.Len(t, rt.steered, i+1) + } - // Queue size should not change - assert.Len(t, p.messageQueue, maxQueuedMessages) - // Should return a warning notification command - assert.NotNil(t, cmd) + // Fourth message should be rejected by the runtime + _, cmd := p.handleSendMsg(messages.SendMsg{Content: "overflow"}) + assert.NotNil(t, cmd) // warning notification + assert.Len(t, rt.steered, 3) + // Display queue should not grow when steer fails + assert.Len(t, p.messageQueue, 3) } -func TestQueueFlow_PopFromQueue(t *testing.T) { +func TestSteer_ClearQueue_AlsoClearsRuntime(t *testing.T) { t.Parallel() - p := newTestChatPage(t) + p, rt := newTestChatPage(t) - // Queue some messages + // Steer some messages p.handleSendMsg(messages.SendMsg{Content: "first"}) p.handleSendMsg(messages.SendMsg{Content: "second"}) p.handleSendMsg(messages.SendMsg{Content: "third"}) require.Len(t, p.messageQueue, 3) - // Manually pop messages (simulating what processNextQueuedMessage does internally) - // Pop first - popped := p.messageQueue[0] - p.messageQueue = p.messageQueue[1:] - p.syncQueueToSidebar() + // Clear the display queue — should also drain the runtime steer queue + _, cmd := p.handleClearQueue() + assert.Empty(t, p.messageQueue) + assert.NotNil(t, cmd) // Success notification + assert.Equal(t, 1, rt.steerCleared) // runtime queue was drained - assert.Equal(t, "first", popped.content) - require.Len(t, p.messageQueue, 2) - assert.Equal(t, "second", p.messageQueue[0].content) - assert.Equal(t, "third", p.messageQueue[1].content) + // Clearing empty queue should NOT call ClearSteerQueue + _, cmd = p.handleClearQueue() + assert.Empty(t, p.messageQueue) + assert.NotNil(t, cmd) // Info notification + assert.Equal(t, 1, rt.steerCleared) // unchanged — no extra drain +} + +func TestSteer_BusyAgent_PassesAttachments(t *testing.T) { + t.Parallel() - // Pop second - popped = p.messageQueue[0] - p.messageQueue = p.messageQueue[1:] + p, rt := newTestChatPage(t) + + // Send a message with an inline (pasted text) attachment while busy. + // File-reference attachments require real files on disk so we only + // test inline content here. + msg := messages.SendMsg{ + Content: "check this", + Attachments: []messages.Attachment{ + {Name: "paste-1", Content: "some pasted text"}, + }, + } + _, cmd := p.handleSendMsg(msg) + assert.NotNil(t, cmd) + + // The runtime should have received the steered message with the + // inline attachment text appended to Content. + require.Len(t, rt.steered, 1) + assert.Contains(t, rt.steered[0].Content, "check this") + assert.Contains(t, rt.steered[0].Content, "some pasted text") +} + +func TestSteer_IdleAgent_ProcessesImmediately(t *testing.T) { + t.Parallel() + + p, rt := newTestChatPage(t) + p.working = false // agent is idle + + // When idle, handleSendMsg should NOT steer — it calls processMessage + // instead. We can't call processMessage without full init, but we can + // verify no steer occurred. + _ = messages.SendMsg{Content: "hello"} + assert.Empty(t, rt.steered) +} - assert.Equal(t, "second", popped.content) +// setFollowupBehavior writes the global follow-up behavior to an isolated +// HOME-rooted config for the duration of a test. Not parallel-safe because it +// mutates HOME via t.Setenv. +func setFollowupBehavior(t *testing.T, behavior string) { + t.Helper() + home := t.TempDir() + t.Setenv("HOME", home) + + cfg, err := userconfig.Load() + require.NoError(t, err) + if cfg.Settings == nil { + cfg.Settings = &userconfig.Settings{} + } + cfg.Settings.FollowupBehavior = behavior + require.NoError(t, cfg.Save()) +} + +func TestFollowUp_BusyAgent_EnqueuesAsFollowUp(t *testing.T) { + setFollowupBehavior(t, userconfig.FollowupBehaviorFollowUp) + + p, rt := newTestChatPage(t) + + // Send while busy in follow-up mode — should call FollowUp, not Steer. + _, cmd := p.handleSendMsg(messages.SendMsg{Content: "first"}) + assert.NotNil(t, cmd) + + assert.Empty(t, rt.steered, "steer path should not be used in follow-up mode") + require.Len(t, rt.followedUp, 1) + assert.Equal(t, "first", rt.followedUp[0].Content) + + // Display queue should still track messages for the sidebar. require.Len(t, p.messageQueue, 1) - assert.Equal(t, "third", p.messageQueue[0].content) + assert.Equal(t, "first", p.messageQueue[0].content) - // Pop last - popped = p.messageQueue[0] - p.messageQueue = p.messageQueue[1:] + // Second message also follows the follow-up path. + _, _ = p.handleSendMsg(messages.SendMsg{Content: "second"}) + require.Len(t, rt.followedUp, 2) + assert.Empty(t, rt.steered) +} - assert.Equal(t, "third", popped.content) - assert.Empty(t, p.messageQueue) +func TestFollowUp_QueueFull_RejectsMessage(t *testing.T) { + setFollowupBehavior(t, userconfig.FollowupBehaviorFollowUp) + + p, rt := newTestChatPage(t) + + rt.followUpFn = func(msg runtime.QueuedMessage) error { + if len(rt.followedUp) >= 2 { + return errors.New("follow-up queue full") + } + rt.followedUp = append(rt.followedUp, msg) + return nil + } + + // First two succeed. + for i := range 2 { + _, _ = p.handleSendMsg(messages.SendMsg{Content: "message"}) + assert.Len(t, rt.followedUp, i+1) + } + + // Third is rejected. + _, cmd := p.handleSendMsg(messages.SendMsg{Content: "overflow"}) + assert.NotNil(t, cmd) // warning notification + assert.Len(t, rt.followedUp, 2) + // Display queue should not grow on rejection. + assert.Len(t, p.messageQueue, 2) } -func TestQueueFlow_ClearQueue(t *testing.T) { +func TestClearQueue_DrainsBothRuntimeQueues(t *testing.T) { t.Parallel() - p := newTestChatPage(t) - // newTestChatPage sets working=true + p, rt := newTestChatPage(t) - // Queue some messages + // Prime the display queue by sending two messages in (default) steer mode. p.handleSendMsg(messages.SendMsg{Content: "first"}) p.handleSendMsg(messages.SendMsg{Content: "second"}) - p.handleSendMsg(messages.SendMsg{Content: "third"}) - - require.Len(t, p.messageQueue, 3) + require.Len(t, p.messageQueue, 2) - // Clear the queue + // Clear should drain both runtime queues unconditionally so switching + // modes after queueing does not leak messages. _, cmd := p.handleClearQueue() - assert.Empty(t, p.messageQueue) - assert.NotNil(t, cmd) // Success notification + assert.NotNil(t, cmd) + assert.Equal(t, 1, rt.steerCleared) + assert.Equal(t, 1, rt.followUpCleared) +} + +// TestClearPendingQueues_StreamCancelled verifies the stream-cancel path +// drains both runtime queues so pending messages from the cancelled +// session cannot leak into the next RunStream call. Without this, a +// follow-up enqueued via FollowUp before Cancel would sit in the +// runtime's in-memory queue and be dequeued as the first action of the +// user's next session. +func TestClearPendingQueues_StreamCancelled(t *testing.T) { + t.Parallel() + + p, rt := newTestChatPage(t) + + // Simulate: the user dispatched messages while the agent was busy + // and then cancelled the stream. + p.handleSendMsg(messages.SendMsg{Content: "first"}) + p.handleSendMsg(messages.SendMsg{Content: "second"}) + require.Len(t, p.messageQueue, 2) + + p.clearPendingQueues() - // Clearing empty queue - _, cmd = p.handleClearQueue() assert.Empty(t, p.messageQueue) - assert.NotNil(t, cmd) // Info notification + assert.Equal(t, 1, rt.steerCleared, + "cancel path must drain runtime steer queue") + assert.Equal(t, 1, rt.followUpCleared, + "cancel path must drain runtime follow-up queue") +} + +func TestFollowUp_BusyAgent_PassesAttachments(t *testing.T) { + setFollowupBehavior(t, userconfig.FollowupBehaviorFollowUp) + + p, rt := newTestChatPage(t) + + msg := messages.SendMsg{ + Content: "check this", + Attachments: []messages.Attachment{ + {Name: "paste-1", Content: "some pasted text"}, + }, + } + _, cmd := p.handleSendMsg(msg) + assert.NotNil(t, cmd) + + require.Len(t, rt.followedUp, 1) + assert.Contains(t, rt.followedUp[0].Content, "check this") + assert.Contains(t, rt.followedUp[0].Content, "some pasted text") + assert.Empty(t, rt.steered) } diff --git a/pkg/tui/page/chat/runtime_events.go b/pkg/tui/page/chat/runtime_events.go index 874f5e09e..6942be47a 100644 --- a/pkg/tui/page/chat/runtime_events.go +++ b/pkg/tui/page/chat/runtime_events.go @@ -256,7 +256,11 @@ func (p *chatPage) handleStreamStopped(msg *runtime.StreamStoppedEvent) tea.Cmd p.streamCancelled = false spinnerCmd := p.setWorking(false) p.setPendingResponse(false) - queueCmd := p.processNextQueuedMessage() + + // Clear the display-only shadow queue; all steered messages have been + // consumed by the runtime loop at this point. + p.messageQueue = nil + p.syncQueueToSidebar() var exitCmd tea.Cmd if p.app.ShouldExitAfterFirstResponse() && p.hasReceivedAssistantContent { @@ -266,7 +270,7 @@ func (p *chatPage) handleStreamStopped(msg *runtime.StreamStoppedEvent) tea.Cmd }) } - return tea.Batch(p.messages.ScrollToBottom(), spinnerCmd, sidebarCmd, queueCmd, exitCmd) + return tea.Batch(p.messages.ScrollToBottom(), spinnerCmd, sidebarCmd, exitCmd) } // handlePartialToolCall processes partial tool call events by rendering each diff --git a/pkg/tui/tui.go b/pkg/tui/tui.go index 38c3bc422..9b390c6c9 100644 --- a/pkg/tui/tui.go +++ b/pkg/tui/tui.go @@ -898,6 +898,9 @@ func (m *appModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { case messages.ToggleSplitDiffMsg: return m.handleToggleSplitDiff() + case messages.SetFollowupBehaviorMsg: + return m.handleSetFollowupBehavior(msg.Mode) + case messages.ClearQueueMsg: updated, cmd := m.chatPage.Update(msg) m.chatPage = updated.(chat.Page) @@ -2196,12 +2199,22 @@ func (m *appModel) handleEditorResize(y int) tea.Cmd { return nil } +// queueLabelForMode returns the word shown after a queue count in the working +// indicator. Matches the active follow-up behavior so it's truthful about +// what's actually happening to pending messages. +func queueLabelForMode() string { + if userconfig.Get().GetFollowupBehavior() == userconfig.FollowupBehaviorFollowUp { + return "queued" + } + return "steered" +} + // renderLeanWorkingIndicator renders a single-line working indicator for lean mode. func (m *appModel) renderLeanWorkingIndicator() string { innerWidth := m.width - appPaddingHorizontal workingText := "Working\u2026" if queueLen := m.chatPage.QueueLength(); queueLen > 0 { - workingText = fmt.Sprintf("Working\u2026 (%d queued)", queueLen) + workingText = fmt.Sprintf("Working\u2026 (%d %s)", queueLen, queueLabelForMode()) } line := m.workingSpinner.View() + " " + styles.SpinnerDotsHighlightStyle.Render(workingText) return lipgloss.NewStyle().Padding(0, styles.AppPadding).Width(innerWidth + appPaddingHorizontal).Render(line) @@ -2238,7 +2251,7 @@ func (m *appModel) renderResizeHandle(width int) string { // Truncate right side and append spinner (handle stays centered) workingText := "Working…" if queueLen := m.chatPage.QueueLength(); queueLen > 0 { - workingText = fmt.Sprintf("Working… (%d queued)", queueLen) + workingText = fmt.Sprintf("Working… (%d %s)", queueLen, queueLabelForMode()) } suffix := " " + m.workingSpinner.View() + " " + styles.SpinnerDotsHighlightStyle.Render(workingText) cancelKeyPart := styles.HighlightWhiteStyle.Render("Esc") @@ -2247,7 +2260,7 @@ func (m *appModel) renderResizeHandle(width int) string { result = lipgloss.NewStyle().MaxWidth(innerWidth-suffixWidth).Render(fullLine) + suffix case m.chatPage.QueueLength() > 0: - queueText := fmt.Sprintf("%d queued", m.chatPage.QueueLength()) + queueText := fmt.Sprintf("%d %s", m.chatPage.QueueLength(), queueLabelForMode()) suffix := " " + styles.WarningStyle.Render(queueText) + " " suffixWidth := lipgloss.Width(suffix) result = lipgloss.NewStyle().MaxWidth(innerWidth-suffixWidth).Render(fullLine) + suffix diff --git a/pkg/userconfig/userconfig.go b/pkg/userconfig/userconfig.go index b1b62e3bf..d1dfffefb 100644 --- a/pkg/userconfig/userconfig.go +++ b/pkg/userconfig/userconfig.go @@ -61,12 +61,54 @@ type Settings struct { // SoundThreshold is the minimum duration in seconds a task must run // before a success sound is played. Defaults to 5 seconds. SoundThreshold int `yaml:"sound_threshold,omitempty"` + // FollowupBehavior controls what happens when the user sends a message + // while the agent is still working. Valid values are "steer" (inject the + // message mid-turn via at the next tool-round boundary) + // and "followup" (enqueue the message as its own undivided turn after the + // current turn completes). Defaults to "steer". + FollowupBehavior string `yaml:"followup_behavior,omitempty"` // Permissions defines global permission patterns applied across all sessions // and agents. These act as user-wide defaults; session-level and agent-level // permissions override them. Permissions *latest.PermissionsConfig `yaml:"permissions,omitempty"` } +// FollowupBehavior values accepted by Settings.FollowupBehavior. +const ( + // FollowupBehaviorSteer injects messages mid-turn via runtime.Steer at the + // next tool-round boundary. This is the default. + FollowupBehaviorSteer = "steer" + // FollowupBehaviorFollowUp enqueues messages via runtime.FollowUp. Each + // queued message gets its own undivided turn after the current one completes. + FollowupBehaviorFollowUp = "followup" +) + +// followupBehaviorOverride is an in-memory override applied by Get so that +// runtime changes to FollowupBehavior (via SetFollowupBehaviorOverride) are +// visible immediately, even while the on-disk config is being updated +// asynchronously by a background save. An empty string means no override. +var ( + followupBehaviorOverrideMu sync.RWMutex + followupBehaviorOverride string +) + +// SetFollowupBehaviorOverride sets an in-memory override for the follow-up +// behavior that is returned by subsequent calls to Get. Callers changing the +// setting at runtime should invoke this synchronously before persisting to +// disk so the new value is visible immediately, without racing the +// background file write. Pass an empty string to clear the override. +func SetFollowupBehaviorOverride(mode string) { + followupBehaviorOverrideMu.Lock() + followupBehaviorOverride = mode + followupBehaviorOverrideMu.Unlock() +} + +func getFollowupBehaviorOverride() string { + followupBehaviorOverrideMu.RLock() + defer followupBehaviorOverrideMu.RUnlock() + return followupBehaviorOverride +} + // DefaultTabTitleMaxLength is the default maximum tab title length when not configured. const DefaultTabTitleMaxLength = 20 @@ -89,6 +131,20 @@ func (s *Settings) GetSound() bool { return s.Sound } +// GetFollowupBehavior returns the configured follow-up behavior, normalizing +// unset or invalid values to FollowupBehaviorSteer (the default). +func (s *Settings) GetFollowupBehavior() string { + if s == nil { + return FollowupBehaviorSteer + } + switch s.FollowupBehavior { + case FollowupBehaviorFollowUp: + return FollowupBehaviorFollowUp + default: + return FollowupBehaviorSteer + } +} + // GetSoundThreshold returns the minimum duration for sound notifications, defaulting to 10s. func (s *Settings) GetSoundThreshold() int { if s == nil || s.SoundThreshold <= 0 { @@ -341,10 +397,20 @@ func (c *Config) GetSettings() *Settings { // Get returns the global user settings from the config file. // Returns an empty Settings if the config file doesn't exist or has no settings. +// +// Any in-memory override set via SetFollowupBehaviorOverride is applied on +// top of the on-disk value so runtime changes are visible immediately, even +// when the disk write is still in flight. func Get() *Settings { + var s *Settings cfg, err := Load() if err != nil { - return &Settings{} + s = &Settings{} + } else { + s = cfg.GetSettings() + } + if ov := getFollowupBehaviorOverride(); ov != "" { + s.FollowupBehavior = ov } - return cfg.GetSettings() + return s } diff --git a/pkg/userconfig/userconfig_test.go b/pkg/userconfig/userconfig_test.go index feb82d21a..66c7b300c 100644 --- a/pkg/userconfig/userconfig_test.go +++ b/pkg/userconfig/userconfig_test.go @@ -890,3 +890,68 @@ func TestConfig_PermissionsRoundTrip(t *testing.T) { assert.Equal(t, original.Settings.Permissions.Deny, loaded.Settings.Permissions.Deny) assert.Equal(t, original.Settings.Permissions.Ask, loaded.Settings.Permissions.Ask) } + +func TestGetFollowupBehavior(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + settings *Settings + want string + }{ + {"nil settings default to steer", nil, FollowupBehaviorSteer}, + {"empty settings default to steer", &Settings{}, FollowupBehaviorSteer}, + {"unset field defaults to steer", &Settings{FollowupBehavior: ""}, FollowupBehaviorSteer}, + {"explicit steer", &Settings{FollowupBehavior: FollowupBehaviorSteer}, FollowupBehaviorSteer}, + {"explicit followup", &Settings{FollowupBehavior: FollowupBehaviorFollowUp}, FollowupBehaviorFollowUp}, + {"typo falls back to steer", &Settings{FollowupBehavior: "steeer"}, FollowupBehaviorSteer}, + {"case-sensitive: FOLLOWUP does not match", &Settings{FollowupBehavior: "FOLLOWUP"}, FollowupBehaviorSteer}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.want, tt.settings.GetFollowupBehavior()) + }) + } +} + +// TestSetFollowupBehaviorOverride verifies that setting the in-memory override +// makes Get return the new value without waiting for a disk write. This is +// the guarantee /followup-behavior relies on so messages sent right after +// the command are dispatched under the new mode instead of racing the +// asynchronous file save. +// +// Cannot run in parallel because it mutates the package-level override and +// HOME (via the test's config isolation). +func TestSetFollowupBehaviorOverride(t *testing.T) { + home := t.TempDir() + t.Setenv("HOME", home) + + // Ensure a clean override at both the start and end of the test so it + // doesn't leak into other non-parallel tests. + SetFollowupBehaviorOverride("") + t.Cleanup(func() { SetFollowupBehaviorOverride("") }) + + // Baseline: no config, no override -> default steer. + assert.Equal(t, FollowupBehaviorSteer, Get().GetFollowupBehavior()) + + // Override to followup without touching disk — Get should return it. + SetFollowupBehaviorOverride(FollowupBehaviorFollowUp) + assert.Equal(t, FollowupBehaviorFollowUp, Get().GetFollowupBehavior()) + + // On-disk value set to steer, but override still wins. + cfg, err := Load() + require.NoError(t, err) + if cfg.Settings == nil { + cfg.Settings = &Settings{} + } + cfg.Settings.FollowupBehavior = FollowupBehaviorSteer + require.NoError(t, cfg.Save()) + assert.Equal(t, FollowupBehaviorFollowUp, Get().GetFollowupBehavior(), + "override should take precedence over on-disk value") + + // Clearing the override returns Get to reading from disk. + SetFollowupBehaviorOverride("") + assert.Equal(t, FollowupBehaviorSteer, Get().GetFollowupBehavior()) +}