Skip to content

Commit 81385a1

Browse files
committed
[app-server] feat: add thread_id and turn_id to item and error notifications
1 parent fc2ff62 commit 81385a1

File tree

2 files changed

+132
-17
lines changed

2 files changed

+132
-17
lines changed

codex-rs/app-server-protocol/src/protocol/v2.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,8 @@ pub struct TurnError {
686686
#[ts(export_to = "v2/")]
687687
pub struct ErrorNotification {
688688
pub error: TurnError,
689+
pub thread_id: String,
690+
pub turn_id: String,
689691
}
690692

691693
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@@ -1021,13 +1023,17 @@ pub struct TurnCompletedNotification {
10211023
#[ts(export_to = "v2/")]
10221024
pub struct ItemStartedNotification {
10231025
pub item: ThreadItem,
1026+
pub thread_id: String,
1027+
pub turn_id: String,
10241028
}
10251029

10261030
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
10271031
#[serde(rename_all = "camelCase")]
10281032
#[ts(export_to = "v2/")]
10291033
pub struct ItemCompletedNotification {
10301034
pub item: ThreadItem,
1035+
pub thread_id: String,
1036+
pub turn_id: String,
10311037
}
10321038

10331039
// Item-specific progress notifications

codex-rs/app-server/src/bespoke_event_handling.rs

Lines changed: 126 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,11 @@ pub(crate) async fn apply_bespoke_event_handling(
117117
changes: patch_changes.clone(),
118118
status: PatchApplyStatus::InProgress,
119119
};
120-
let notification = ItemStartedNotification { item };
120+
let notification = ItemStartedNotification {
121+
thread_id: conversation_id.to_string(),
122+
turn_id: event_id.clone(),
123+
item,
124+
};
121125
outgoing
122126
.send_server_notification(ServerNotification::ItemStarted(notification))
123127
.await;
@@ -200,6 +204,7 @@ pub(crate) async fn apply_bespoke_event_handling(
200204
tokio::spawn(async move {
201205
on_command_execution_request_approval_response(
202206
event_id,
207+
conversation_id,
203208
item_id,
204209
command_string,
205210
cwd,
@@ -214,13 +219,23 @@ pub(crate) async fn apply_bespoke_event_handling(
214219
},
215220
// TODO(celia): properly construct McpToolCall TurnItem in core.
216221
EventMsg::McpToolCallBegin(begin_event) => {
217-
let notification = construct_mcp_tool_call_notification(begin_event).await;
222+
let notification = construct_mcp_tool_call_notification(
223+
begin_event,
224+
conversation_id.to_string(),
225+
event_id.clone(),
226+
)
227+
.await;
218228
outgoing
219229
.send_server_notification(ServerNotification::ItemStarted(notification))
220230
.await;
221231
}
222232
EventMsg::McpToolCallEnd(end_event) => {
223-
let notification = construct_mcp_tool_call_end_notification(end_event).await;
233+
let notification = construct_mcp_tool_call_end_notification(
234+
end_event,
235+
conversation_id.to_string(),
236+
event_id.clone(),
237+
)
238+
.await;
224239
outgoing
225240
.send_server_notification(ServerNotification::ItemCompleted(notification))
226241
.await;
@@ -287,6 +302,8 @@ pub(crate) async fn apply_bespoke_event_handling(
287302
outgoing
288303
.send_server_notification(ServerNotification::Error(ErrorNotification {
289304
error: turn_error,
305+
thread_id: conversation_id.to_string(),
306+
turn_id: event_id.clone(),
290307
}))
291308
.await;
292309
}
@@ -300,11 +317,15 @@ pub(crate) async fn apply_bespoke_event_handling(
300317
outgoing
301318
.send_server_notification(ServerNotification::Error(ErrorNotification {
302319
error: turn_error,
320+
thread_id: conversation_id.to_string(),
321+
turn_id: event_id.clone(),
303322
}))
304323
.await;
305324
}
306325
EventMsg::EnteredReviewMode(review_request) => {
307326
let notification = ItemStartedNotification {
327+
thread_id: conversation_id.to_string(),
328+
turn_id: event_id.clone(),
308329
item: ThreadItem::CodeReview {
309330
id: event_id.clone(),
310331
review: review_request.user_facing_hint,
@@ -316,14 +337,22 @@ pub(crate) async fn apply_bespoke_event_handling(
316337
}
317338
EventMsg::ItemStarted(item_started_event) => {
318339
let item: ThreadItem = item_started_event.item.clone().into();
319-
let notification = ItemStartedNotification { item };
340+
let notification = ItemStartedNotification {
341+
thread_id: conversation_id.to_string(),
342+
turn_id: event_id.clone(),
343+
item,
344+
};
320345
outgoing
321346
.send_server_notification(ServerNotification::ItemStarted(notification))
322347
.await;
323348
}
324349
EventMsg::ItemCompleted(item_completed_event) => {
325350
let item: ThreadItem = item_completed_event.item.clone().into();
326-
let notification = ItemCompletedNotification { item };
351+
let notification = ItemCompletedNotification {
352+
thread_id: conversation_id.to_string(),
353+
turn_id: event_id.clone(),
354+
item,
355+
};
327356
outgoing
328357
.send_server_notification(ServerNotification::ItemCompleted(notification))
329358
.await;
@@ -333,9 +362,12 @@ pub(crate) async fn apply_bespoke_event_handling(
333362
Some(output) => render_review_output_text(&output),
334363
None => REVIEW_FALLBACK_MESSAGE.to_string(),
335364
};
365+
let review_item_id = event_id.clone();
336366
let notification = ItemCompletedNotification {
367+
thread_id: conversation_id.to_string(),
368+
turn_id: event_id.clone(),
337369
item: ThreadItem::CodeReview {
338-
id: event_id,
370+
id: review_item_id,
339371
review: review_text,
340372
},
341373
};
@@ -359,7 +391,11 @@ pub(crate) async fn apply_bespoke_event_handling(
359391
changes: convert_patch_changes(&patch_begin_event.changes),
360392
status: PatchApplyStatus::InProgress,
361393
};
362-
let notification = ItemStartedNotification { item };
394+
let notification = ItemStartedNotification {
395+
thread_id: conversation_id.to_string(),
396+
turn_id: event_id.clone(),
397+
item,
398+
};
363399
outgoing
364400
.send_server_notification(ServerNotification::ItemStarted(notification))
365401
.await;
@@ -381,6 +417,7 @@ pub(crate) async fn apply_bespoke_event_handling(
381417
item_id,
382418
changes,
383419
status,
420+
event_id.clone(),
384421
outgoing.as_ref(),
385422
&turn_summary_store,
386423
)
@@ -406,7 +443,11 @@ pub(crate) async fn apply_bespoke_event_handling(
406443
exit_code: None,
407444
duration_ms: None,
408445
};
409-
let notification = ItemStartedNotification { item };
446+
let notification = ItemStartedNotification {
447+
thread_id: conversation_id.to_string(),
448+
turn_id: event_id.clone(),
449+
item,
450+
};
410451
outgoing
411452
.send_server_notification(ServerNotification::ItemStarted(notification))
412453
.await;
@@ -463,7 +504,11 @@ pub(crate) async fn apply_bespoke_event_handling(
463504
duration_ms: Some(duration_ms),
464505
};
465506

466-
let notification = ItemCompletedNotification { item };
507+
let notification = ItemCompletedNotification {
508+
thread_id: conversation_id.to_string(),
509+
turn_id: event_id.clone(),
510+
item,
511+
};
467512
outgoing
468513
.send_server_notification(ServerNotification::ItemCompleted(notification))
469514
.await;
@@ -521,6 +566,7 @@ async fn complete_file_change_item(
521566
item_id: String,
522567
changes: Vec<FileUpdateChange>,
523568
status: PatchApplyStatus,
569+
turn_id: String,
524570
outgoing: &OutgoingMessageSender,
525571
turn_summary_store: &TurnSummaryStore,
526572
) {
@@ -536,13 +582,20 @@ async fn complete_file_change_item(
536582
changes,
537583
status,
538584
};
539-
let notification = ItemCompletedNotification { item };
585+
let notification = ItemCompletedNotification {
586+
thread_id: conversation_id.to_string(),
587+
turn_id,
588+
item,
589+
};
540590
outgoing
541591
.send_server_notification(ServerNotification::ItemCompleted(notification))
542592
.await;
543593
}
544594

595+
#[allow(clippy::too_many_arguments)]
545596
async fn complete_command_execution_item(
597+
conversation_id: ConversationId,
598+
turn_id: String,
546599
item_id: String,
547600
command: String,
548601
cwd: PathBuf,
@@ -560,7 +613,11 @@ async fn complete_command_execution_item(
560613
exit_code: None,
561614
duration_ms: None,
562615
};
563-
let notification = ItemCompletedNotification { item };
616+
let notification = ItemCompletedNotification {
617+
thread_id: conversation_id.to_string(),
618+
turn_id,
619+
item,
620+
};
564621
outgoing
565622
.send_server_notification(ServerNotification::ItemCompleted(notification))
566623
.await;
@@ -798,6 +855,7 @@ async fn on_file_change_request_approval_response(
798855
item_id,
799856
changes,
800857
status,
858+
event_id.clone(),
801859
outgoing.as_ref(),
802860
&turn_summary_store,
803861
)
@@ -818,6 +876,7 @@ async fn on_file_change_request_approval_response(
818876
#[allow(clippy::too_many_arguments)]
819877
async fn on_command_execution_request_approval_response(
820878
event_id: String,
879+
conversation_id: ConversationId,
821880
item_id: String,
822881
command: String,
823882
cwd: PathBuf,
@@ -867,6 +926,8 @@ async fn on_command_execution_request_approval_response(
867926

868927
if let Some(status) = completion_status {
869928
complete_command_execution_item(
929+
conversation_id,
930+
event_id.clone(),
870931
item_id.clone(),
871932
command.clone(),
872933
cwd.clone(),
@@ -891,6 +952,8 @@ async fn on_command_execution_request_approval_response(
891952
/// similar to handle_mcp_tool_call_begin in exec
892953
async fn construct_mcp_tool_call_notification(
893954
begin_event: McpToolCallBeginEvent,
955+
thread_id: String,
956+
turn_id: String,
894957
) -> ItemStartedNotification {
895958
let item = ThreadItem::McpToolCall {
896959
id: begin_event.call_id,
@@ -901,12 +964,18 @@ async fn construct_mcp_tool_call_notification(
901964
result: None,
902965
error: None,
903966
};
904-
ItemStartedNotification { item }
967+
ItemStartedNotification {
968+
thread_id,
969+
turn_id,
970+
item,
971+
}
905972
}
906973

907974
/// simiilar to handle_mcp_tool_call_end in exec
908975
async fn construct_mcp_tool_call_end_notification(
909976
end_event: McpToolCallEndEvent,
977+
thread_id: String,
978+
turn_id: String,
910979
) -> ItemCompletedNotification {
911980
let status = if end_event.is_success() {
912981
McpToolCallStatus::Completed
@@ -939,7 +1008,11 @@ async fn construct_mcp_tool_call_end_notification(
9391008
result,
9401009
error,
9411010
};
942-
ItemCompletedNotification { item }
1011+
ItemCompletedNotification {
1012+
thread_id,
1013+
turn_id,
1014+
item,
1015+
}
9431016
}
9441017

9451018
#[cfg(test)]
@@ -1122,9 +1195,18 @@ mod tests {
11221195
},
11231196
};
11241197

1125-
let notification = construct_mcp_tool_call_notification(begin_event.clone()).await;
1198+
let thread_id = ConversationId::new().to_string();
1199+
let turn_id = "turn_1".to_string();
1200+
let notification = construct_mcp_tool_call_notification(
1201+
begin_event.clone(),
1202+
thread_id.clone(),
1203+
turn_id.clone(),
1204+
)
1205+
.await;
11261206

11271207
let expected = ItemStartedNotification {
1208+
thread_id,
1209+
turn_id,
11281210
item: ThreadItem::McpToolCall {
11291211
id: begin_event.call_id,
11301212
server: begin_event.invocation.server,
@@ -1267,9 +1349,18 @@ mod tests {
12671349
},
12681350
};
12691351

1270-
let notification = construct_mcp_tool_call_notification(begin_event.clone()).await;
1352+
let thread_id = ConversationId::new().to_string();
1353+
let turn_id = "turn_2".to_string();
1354+
let notification = construct_mcp_tool_call_notification(
1355+
begin_event.clone(),
1356+
thread_id.clone(),
1357+
turn_id.clone(),
1358+
)
1359+
.await;
12711360

12721361
let expected = ItemStartedNotification {
1362+
thread_id,
1363+
turn_id,
12731364
item: ThreadItem::McpToolCall {
12741365
id: begin_event.call_id,
12751366
server: begin_event.invocation.server,
@@ -1308,9 +1399,18 @@ mod tests {
13081399
result: Ok(result),
13091400
};
13101401

1311-
let notification = construct_mcp_tool_call_end_notification(end_event.clone()).await;
1402+
let thread_id = ConversationId::new().to_string();
1403+
let turn_id = "turn_3".to_string();
1404+
let notification = construct_mcp_tool_call_end_notification(
1405+
end_event.clone(),
1406+
thread_id.clone(),
1407+
turn_id.clone(),
1408+
)
1409+
.await;
13121410

13131411
let expected = ItemCompletedNotification {
1412+
thread_id,
1413+
turn_id,
13141414
item: ThreadItem::McpToolCall {
13151415
id: end_event.call_id,
13161416
server: end_event.invocation.server,
@@ -1341,9 +1441,18 @@ mod tests {
13411441
result: Err("boom".to_string()),
13421442
};
13431443

1344-
let notification = construct_mcp_tool_call_end_notification(end_event.clone()).await;
1444+
let thread_id = ConversationId::new().to_string();
1445+
let turn_id = "turn_4".to_string();
1446+
let notification = construct_mcp_tool_call_end_notification(
1447+
end_event.clone(),
1448+
thread_id.clone(),
1449+
turn_id.clone(),
1450+
)
1451+
.await;
13451452

13461453
let expected = ItemCompletedNotification {
1454+
thread_id,
1455+
turn_id,
13471456
item: ThreadItem::McpToolCall {
13481457
id: end_event.call_id,
13491458
server: end_event.invocation.server,

0 commit comments

Comments
 (0)