From 81385a1cf57e73de961833c69df4d2f2f1105836 Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Mon, 24 Nov 2025 20:29:08 -0800 Subject: [PATCH 1/2] [app-server] feat: add thread_id and turn_id to item and error notifications --- .../app-server-protocol/src/protocol/v2.rs | 6 + .../app-server/src/bespoke_event_handling.rs | 143 +++++++++++++++--- 2 files changed, 132 insertions(+), 17 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 13b7b8888c..ca24d1a60b 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -686,6 +686,8 @@ pub struct TurnError { #[ts(export_to = "v2/")] pub struct ErrorNotification { pub error: TurnError, + pub thread_id: String, + pub turn_id: String, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -1021,6 +1023,8 @@ pub struct TurnCompletedNotification { #[ts(export_to = "v2/")] pub struct ItemStartedNotification { pub item: ThreadItem, + pub thread_id: String, + pub turn_id: String, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -1028,6 +1032,8 @@ pub struct ItemStartedNotification { #[ts(export_to = "v2/")] pub struct ItemCompletedNotification { pub item: ThreadItem, + pub thread_id: String, + pub turn_id: String, } // Item-specific progress notifications diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 0c2445d854..6f79fb7f5a 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -117,7 +117,11 @@ pub(crate) async fn apply_bespoke_event_handling( changes: patch_changes.clone(), status: PatchApplyStatus::InProgress, }; - let notification = ItemStartedNotification { item }; + let notification = ItemStartedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_id.clone(), + item, + }; outgoing .send_server_notification(ServerNotification::ItemStarted(notification)) .await; @@ -200,6 +204,7 @@ pub(crate) async fn apply_bespoke_event_handling( tokio::spawn(async move { on_command_execution_request_approval_response( event_id, + conversation_id, item_id, command_string, cwd, @@ -214,13 +219,23 @@ pub(crate) async fn apply_bespoke_event_handling( }, // TODO(celia): properly construct McpToolCall TurnItem in core. EventMsg::McpToolCallBegin(begin_event) => { - let notification = construct_mcp_tool_call_notification(begin_event).await; + let notification = construct_mcp_tool_call_notification( + begin_event, + conversation_id.to_string(), + event_id.clone(), + ) + .await; outgoing .send_server_notification(ServerNotification::ItemStarted(notification)) .await; } EventMsg::McpToolCallEnd(end_event) => { - let notification = construct_mcp_tool_call_end_notification(end_event).await; + let notification = construct_mcp_tool_call_end_notification( + end_event, + conversation_id.to_string(), + event_id.clone(), + ) + .await; outgoing .send_server_notification(ServerNotification::ItemCompleted(notification)) .await; @@ -287,6 +302,8 @@ pub(crate) async fn apply_bespoke_event_handling( outgoing .send_server_notification(ServerNotification::Error(ErrorNotification { error: turn_error, + thread_id: conversation_id.to_string(), + turn_id: event_id.clone(), })) .await; } @@ -300,11 +317,15 @@ pub(crate) async fn apply_bespoke_event_handling( outgoing .send_server_notification(ServerNotification::Error(ErrorNotification { error: turn_error, + thread_id: conversation_id.to_string(), + turn_id: event_id.clone(), })) .await; } EventMsg::EnteredReviewMode(review_request) => { let notification = ItemStartedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_id.clone(), item: ThreadItem::CodeReview { id: event_id.clone(), review: review_request.user_facing_hint, @@ -316,14 +337,22 @@ pub(crate) async fn apply_bespoke_event_handling( } EventMsg::ItemStarted(item_started_event) => { let item: ThreadItem = item_started_event.item.clone().into(); - let notification = ItemStartedNotification { item }; + let notification = ItemStartedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_id.clone(), + item, + }; outgoing .send_server_notification(ServerNotification::ItemStarted(notification)) .await; } EventMsg::ItemCompleted(item_completed_event) => { let item: ThreadItem = item_completed_event.item.clone().into(); - let notification = ItemCompletedNotification { item }; + let notification = ItemCompletedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_id.clone(), + item, + }; outgoing .send_server_notification(ServerNotification::ItemCompleted(notification)) .await; @@ -333,9 +362,12 @@ pub(crate) async fn apply_bespoke_event_handling( Some(output) => render_review_output_text(&output), None => REVIEW_FALLBACK_MESSAGE.to_string(), }; + let review_item_id = event_id.clone(); let notification = ItemCompletedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_id.clone(), item: ThreadItem::CodeReview { - id: event_id, + id: review_item_id, review: review_text, }, }; @@ -359,7 +391,11 @@ pub(crate) async fn apply_bespoke_event_handling( changes: convert_patch_changes(&patch_begin_event.changes), status: PatchApplyStatus::InProgress, }; - let notification = ItemStartedNotification { item }; + let notification = ItemStartedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_id.clone(), + item, + }; outgoing .send_server_notification(ServerNotification::ItemStarted(notification)) .await; @@ -381,6 +417,7 @@ pub(crate) async fn apply_bespoke_event_handling( item_id, changes, status, + event_id.clone(), outgoing.as_ref(), &turn_summary_store, ) @@ -406,7 +443,11 @@ pub(crate) async fn apply_bespoke_event_handling( exit_code: None, duration_ms: None, }; - let notification = ItemStartedNotification { item }; + let notification = ItemStartedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_id.clone(), + item, + }; outgoing .send_server_notification(ServerNotification::ItemStarted(notification)) .await; @@ -463,7 +504,11 @@ pub(crate) async fn apply_bespoke_event_handling( duration_ms: Some(duration_ms), }; - let notification = ItemCompletedNotification { item }; + let notification = ItemCompletedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_id.clone(), + item, + }; outgoing .send_server_notification(ServerNotification::ItemCompleted(notification)) .await; @@ -521,6 +566,7 @@ async fn complete_file_change_item( item_id: String, changes: Vec, status: PatchApplyStatus, + turn_id: String, outgoing: &OutgoingMessageSender, turn_summary_store: &TurnSummaryStore, ) { @@ -536,13 +582,20 @@ async fn complete_file_change_item( changes, status, }; - let notification = ItemCompletedNotification { item }; + let notification = ItemCompletedNotification { + thread_id: conversation_id.to_string(), + turn_id, + item, + }; outgoing .send_server_notification(ServerNotification::ItemCompleted(notification)) .await; } +#[allow(clippy::too_many_arguments)] async fn complete_command_execution_item( + conversation_id: ConversationId, + turn_id: String, item_id: String, command: String, cwd: PathBuf, @@ -560,7 +613,11 @@ async fn complete_command_execution_item( exit_code: None, duration_ms: None, }; - let notification = ItemCompletedNotification { item }; + let notification = ItemCompletedNotification { + thread_id: conversation_id.to_string(), + turn_id, + item, + }; outgoing .send_server_notification(ServerNotification::ItemCompleted(notification)) .await; @@ -798,6 +855,7 @@ async fn on_file_change_request_approval_response( item_id, changes, status, + event_id.clone(), outgoing.as_ref(), &turn_summary_store, ) @@ -818,6 +876,7 @@ async fn on_file_change_request_approval_response( #[allow(clippy::too_many_arguments)] async fn on_command_execution_request_approval_response( event_id: String, + conversation_id: ConversationId, item_id: String, command: String, cwd: PathBuf, @@ -867,6 +926,8 @@ async fn on_command_execution_request_approval_response( if let Some(status) = completion_status { complete_command_execution_item( + conversation_id, + event_id.clone(), item_id.clone(), command.clone(), cwd.clone(), @@ -891,6 +952,8 @@ async fn on_command_execution_request_approval_response( /// similar to handle_mcp_tool_call_begin in exec async fn construct_mcp_tool_call_notification( begin_event: McpToolCallBeginEvent, + thread_id: String, + turn_id: String, ) -> ItemStartedNotification { let item = ThreadItem::McpToolCall { id: begin_event.call_id, @@ -901,12 +964,18 @@ async fn construct_mcp_tool_call_notification( result: None, error: None, }; - ItemStartedNotification { item } + ItemStartedNotification { + thread_id, + turn_id, + item, + } } /// simiilar to handle_mcp_tool_call_end in exec async fn construct_mcp_tool_call_end_notification( end_event: McpToolCallEndEvent, + thread_id: String, + turn_id: String, ) -> ItemCompletedNotification { let status = if end_event.is_success() { McpToolCallStatus::Completed @@ -939,7 +1008,11 @@ async fn construct_mcp_tool_call_end_notification( result, error, }; - ItemCompletedNotification { item } + ItemCompletedNotification { + thread_id, + turn_id, + item, + } } #[cfg(test)] @@ -1122,9 +1195,18 @@ mod tests { }, }; - let notification = construct_mcp_tool_call_notification(begin_event.clone()).await; + let thread_id = ConversationId::new().to_string(); + let turn_id = "turn_1".to_string(); + let notification = construct_mcp_tool_call_notification( + begin_event.clone(), + thread_id.clone(), + turn_id.clone(), + ) + .await; let expected = ItemStartedNotification { + thread_id, + turn_id, item: ThreadItem::McpToolCall { id: begin_event.call_id, server: begin_event.invocation.server, @@ -1267,9 +1349,18 @@ mod tests { }, }; - let notification = construct_mcp_tool_call_notification(begin_event.clone()).await; + let thread_id = ConversationId::new().to_string(); + let turn_id = "turn_2".to_string(); + let notification = construct_mcp_tool_call_notification( + begin_event.clone(), + thread_id.clone(), + turn_id.clone(), + ) + .await; let expected = ItemStartedNotification { + thread_id, + turn_id, item: ThreadItem::McpToolCall { id: begin_event.call_id, server: begin_event.invocation.server, @@ -1308,9 +1399,18 @@ mod tests { result: Ok(result), }; - let notification = construct_mcp_tool_call_end_notification(end_event.clone()).await; + let thread_id = ConversationId::new().to_string(); + let turn_id = "turn_3".to_string(); + let notification = construct_mcp_tool_call_end_notification( + end_event.clone(), + thread_id.clone(), + turn_id.clone(), + ) + .await; let expected = ItemCompletedNotification { + thread_id, + turn_id, item: ThreadItem::McpToolCall { id: end_event.call_id, server: end_event.invocation.server, @@ -1341,9 +1441,18 @@ mod tests { result: Err("boom".to_string()), }; - let notification = construct_mcp_tool_call_end_notification(end_event.clone()).await; + let thread_id = ConversationId::new().to_string(); + let turn_id = "turn_4".to_string(); + let notification = construct_mcp_tool_call_end_notification( + end_event.clone(), + thread_id.clone(), + turn_id.clone(), + ) + .await; let expected = ItemCompletedNotification { + thread_id, + turn_id, item: ThreadItem::McpToolCall { id: end_event.call_id, server: end_event.invocation.server, From 2dbbf33534b4d38859c86d487e2e9c14e8ed3185 Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Mon, 24 Nov 2025 20:45:37 -0800 Subject: [PATCH 2/2] also add thread_id to turn/started and turn/completed --- codex-rs/app-server-protocol/src/protocol/v2.rs | 2 ++ codex-rs/app-server/src/bespoke_event_handling.rs | 7 +++++-- codex-rs/app-server/src/codex_message_processor.rs | 7 +++++-- codex-rs/app-server/tests/suite/v2/turn_interrupt.rs | 4 +++- codex-rs/app-server/tests/suite/v2/turn_start.rs | 2 ++ 5 files changed, 17 insertions(+), 5 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index ca24d1a60b..3aa362b4cf 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -999,6 +999,7 @@ pub struct ThreadStartedNotification { #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] pub struct TurnStartedNotification { + pub thread_id: String, pub turn: Turn, } @@ -1015,6 +1016,7 @@ pub struct Usage { #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] pub struct TurnCompletedNotification { + pub thread_id: String, pub turn: Turn, } diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 6f79fb7f5a..8de64880a4 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -545,11 +545,13 @@ pub(crate) async fn apply_bespoke_event_handling( } async fn emit_turn_completed_with_status( + conversation_id: ConversationId, event_id: String, status: TurnStatus, outgoing: &OutgoingMessageSender, ) { let notification = TurnCompletedNotification { + thread_id: conversation_id.to_string(), turn: Turn { id: event_id, items: vec![], @@ -645,7 +647,7 @@ async fn handle_turn_complete( TurnStatus::Completed }; - emit_turn_completed_with_status(event_id, status, outgoing).await; + emit_turn_completed_with_status(conversation_id, event_id, status, outgoing).await; } async fn handle_turn_interrupted( @@ -656,7 +658,8 @@ async fn handle_turn_interrupted( ) { find_and_remove_turn_summary(conversation_id, turn_summary_store).await; - emit_turn_completed_with_status(event_id, TurnStatus::Interrupted, outgoing).await; + emit_turn_completed_with_status(conversation_id, event_id, TurnStatus::Interrupted, outgoing) + .await; } async fn handle_error( diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index a9f56de115..26c1b8a186 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -2473,7 +2473,10 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; // Emit v2 turn/started notification. - let notif = TurnStartedNotification { turn }; + let notif = TurnStartedNotification { + thread_id: params.thread_id, + turn, + }; self.outgoing .send_server_notification(ServerNotification::TurnStarted(notif)) .await; @@ -2531,7 +2534,7 @@ impl CodexMessageProcessor { let response = TurnStartResponse { turn: turn.clone() }; self.outgoing.send_response(request_id, response).await; - let notif = TurnStartedNotification { turn }; + let notif = TurnStartedNotification { thread_id, turn }; self.outgoing .send_server_notification(ServerNotification::TurnStarted(notif)) .await; diff --git a/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs b/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs index 83389ed1d1..f68ffb899c 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs @@ -88,10 +88,11 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> { // Give the command a brief moment to start. tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let thread_id = thread.id.clone(); // Interrupt the in-progress turn by id (v2 API). let interrupt_id = mcp .send_turn_interrupt_request(TurnInterruptParams { - thread_id: thread.id, + thread_id: thread_id.clone(), turn_id: turn.id, }) .await?; @@ -112,6 +113,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> { .params .expect("turn/completed params must be present"), )?; + assert_eq!(completed.thread_id, thread_id); assert_eq!(completed.turn.status, TurnStatus::Interrupted); Ok(()) diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index a3e75268c9..2c23fedd94 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -95,6 +95,7 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<( .await??; let started: TurnStartedNotification = serde_json::from_value(notif.params.expect("params must be present"))?; + assert_eq!(started.thread_id, thread.id); assert_eq!( started.turn.status, codex_app_server_protocol::TurnStatus::InProgress @@ -138,6 +139,7 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<( .params .expect("turn/completed params must be present"), )?; + assert_eq!(completed.thread_id, thread.id); assert_eq!(completed.turn.status, TurnStatus::Completed); Ok(())