Skip to content

Commit f7f5fcb

Browse files
committed
[app-server] feat: add thread_id and turn_id to item and error notifications
1 parent fc2ff62 commit f7f5fcb

File tree

2 files changed

+131
-17
lines changed

2 files changed

+131
-17
lines changed

codex-rs/app-server-protocol/src/protocol/v2.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,8 @@ pub struct TurnError {
686686
#[ts(export_to = "v2/")]
687687
pub struct ErrorNotification {
688688
pub error: TurnError,
689+
pub thread_id: String,
690+
pub turn_id: String,
689691
}
690692

691693
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@@ -1021,13 +1023,17 @@ pub struct TurnCompletedNotification {
10211023
#[ts(export_to = "v2/")]
10221024
pub struct ItemStartedNotification {
10231025
pub item: ThreadItem,
1026+
pub thread_id: String,
1027+
pub turn_id: String,
10241028
}
10251029

10261030
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
10271031
#[serde(rename_all = "camelCase")]
10281032
#[ts(export_to = "v2/")]
10291033
pub struct ItemCompletedNotification {
10301034
pub item: ThreadItem,
1035+
pub thread_id: String,
1036+
pub turn_id: String,
10311037
}
10321038

10331039
// Item-specific progress notifications

codex-rs/app-server/src/bespoke_event_handling.rs

Lines changed: 125 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,11 @@ pub(crate) async fn apply_bespoke_event_handling(
117117
changes: patch_changes.clone(),
118118
status: PatchApplyStatus::InProgress,
119119
};
120-
let notification = ItemStartedNotification { item };
120+
let notification = ItemStartedNotification {
121+
thread_id: conversation_id.to_string(),
122+
turn_id: event_id.clone(),
123+
item,
124+
};
121125
outgoing
122126
.send_server_notification(ServerNotification::ItemStarted(notification))
123127
.await;
@@ -200,6 +204,7 @@ pub(crate) async fn apply_bespoke_event_handling(
200204
tokio::spawn(async move {
201205
on_command_execution_request_approval_response(
202206
event_id,
207+
conversation_id,
203208
item_id,
204209
command_string,
205210
cwd,
@@ -214,13 +219,23 @@ pub(crate) async fn apply_bespoke_event_handling(
214219
},
215220
// TODO(celia): properly construct McpToolCall TurnItem in core.
216221
EventMsg::McpToolCallBegin(begin_event) => {
217-
let notification = construct_mcp_tool_call_notification(begin_event).await;
222+
let notification = construct_mcp_tool_call_notification(
223+
begin_event,
224+
conversation_id.to_string(),
225+
event_id.clone(),
226+
)
227+
.await;
218228
outgoing
219229
.send_server_notification(ServerNotification::ItemStarted(notification))
220230
.await;
221231
}
222232
EventMsg::McpToolCallEnd(end_event) => {
223-
let notification = construct_mcp_tool_call_end_notification(end_event).await;
233+
let notification = construct_mcp_tool_call_end_notification(
234+
end_event,
235+
conversation_id.to_string(),
236+
event_id.clone(),
237+
)
238+
.await;
224239
outgoing
225240
.send_server_notification(ServerNotification::ItemCompleted(notification))
226241
.await;
@@ -287,6 +302,8 @@ pub(crate) async fn apply_bespoke_event_handling(
287302
outgoing
288303
.send_server_notification(ServerNotification::Error(ErrorNotification {
289304
error: turn_error,
305+
thread_id: conversation_id.to_string(),
306+
turn_id: event_id.clone(),
290307
}))
291308
.await;
292309
}
@@ -300,11 +317,15 @@ pub(crate) async fn apply_bespoke_event_handling(
300317
outgoing
301318
.send_server_notification(ServerNotification::Error(ErrorNotification {
302319
error: turn_error,
320+
thread_id: conversation_id.to_string(),
321+
turn_id: event_id.clone(),
303322
}))
304323
.await;
305324
}
306325
EventMsg::EnteredReviewMode(review_request) => {
307326
let notification = ItemStartedNotification {
327+
thread_id: conversation_id.to_string(),
328+
turn_id: event_id.clone(),
308329
item: ThreadItem::CodeReview {
309330
id: event_id.clone(),
310331
review: review_request.user_facing_hint,
@@ -316,14 +337,22 @@ pub(crate) async fn apply_bespoke_event_handling(
316337
}
317338
EventMsg::ItemStarted(item_started_event) => {
318339
let item: ThreadItem = item_started_event.item.clone().into();
319-
let notification = ItemStartedNotification { item };
340+
let notification = ItemStartedNotification {
341+
thread_id: conversation_id.to_string(),
342+
turn_id: event_id.clone(),
343+
item,
344+
};
320345
outgoing
321346
.send_server_notification(ServerNotification::ItemStarted(notification))
322347
.await;
323348
}
324349
EventMsg::ItemCompleted(item_completed_event) => {
325350
let item: ThreadItem = item_completed_event.item.clone().into();
326-
let notification = ItemCompletedNotification { item };
351+
let notification = ItemCompletedNotification {
352+
thread_id: conversation_id.to_string(),
353+
turn_id: event_id.clone(),
354+
item,
355+
};
327356
outgoing
328357
.send_server_notification(ServerNotification::ItemCompleted(notification))
329358
.await;
@@ -333,9 +362,12 @@ pub(crate) async fn apply_bespoke_event_handling(
333362
Some(output) => render_review_output_text(&output),
334363
None => REVIEW_FALLBACK_MESSAGE.to_string(),
335364
};
365+
let review_item_id = event_id.clone();
336366
let notification = ItemCompletedNotification {
367+
thread_id: conversation_id.to_string(),
368+
turn_id: event_id.clone(),
337369
item: ThreadItem::CodeReview {
338-
id: event_id,
370+
id: review_item_id,
339371
review: review_text,
340372
},
341373
};
@@ -359,7 +391,11 @@ pub(crate) async fn apply_bespoke_event_handling(
359391
changes: convert_patch_changes(&patch_begin_event.changes),
360392
status: PatchApplyStatus::InProgress,
361393
};
362-
let notification = ItemStartedNotification { item };
394+
let notification = ItemStartedNotification {
395+
thread_id: conversation_id.to_string(),
396+
turn_id: event_id.clone(),
397+
item,
398+
};
363399
outgoing
364400
.send_server_notification(ServerNotification::ItemStarted(notification))
365401
.await;
@@ -381,6 +417,7 @@ pub(crate) async fn apply_bespoke_event_handling(
381417
item_id,
382418
changes,
383419
status,
420+
event_id.clone(),
384421
outgoing.as_ref(),
385422
&turn_summary_store,
386423
)
@@ -406,7 +443,11 @@ pub(crate) async fn apply_bespoke_event_handling(
406443
exit_code: None,
407444
duration_ms: None,
408445
};
409-
let notification = ItemStartedNotification { item };
446+
let notification = ItemStartedNotification {
447+
thread_id: conversation_id.to_string(),
448+
turn_id: event_id.clone(),
449+
item,
450+
};
410451
outgoing
411452
.send_server_notification(ServerNotification::ItemStarted(notification))
412453
.await;
@@ -463,7 +504,11 @@ pub(crate) async fn apply_bespoke_event_handling(
463504
duration_ms: Some(duration_ms),
464505
};
465506

466-
let notification = ItemCompletedNotification { item };
507+
let notification = ItemCompletedNotification {
508+
thread_id: conversation_id.to_string(),
509+
turn_id: event_id.clone(),
510+
item,
511+
};
467512
outgoing
468513
.send_server_notification(ServerNotification::ItemCompleted(notification))
469514
.await;
@@ -521,6 +566,7 @@ async fn complete_file_change_item(
521566
item_id: String,
522567
changes: Vec<FileUpdateChange>,
523568
status: PatchApplyStatus,
569+
turn_id: String,
524570
outgoing: &OutgoingMessageSender,
525571
turn_summary_store: &TurnSummaryStore,
526572
) {
@@ -536,13 +582,19 @@ async fn complete_file_change_item(
536582
changes,
537583
status,
538584
};
539-
let notification = ItemCompletedNotification { item };
585+
let notification = ItemCompletedNotification {
586+
thread_id: conversation_id.to_string(),
587+
turn_id,
588+
item,
589+
};
540590
outgoing
541591
.send_server_notification(ServerNotification::ItemCompleted(notification))
542592
.await;
543593
}
544594

545595
async fn complete_command_execution_item(
596+
conversation_id: ConversationId,
597+
turn_id: String,
546598
item_id: String,
547599
command: String,
548600
cwd: PathBuf,
@@ -560,7 +612,11 @@ async fn complete_command_execution_item(
560612
exit_code: None,
561613
duration_ms: None,
562614
};
563-
let notification = ItemCompletedNotification { item };
615+
let notification = ItemCompletedNotification {
616+
thread_id: conversation_id.to_string(),
617+
turn_id,
618+
item,
619+
};
564620
outgoing
565621
.send_server_notification(ServerNotification::ItemCompleted(notification))
566622
.await;
@@ -798,6 +854,7 @@ async fn on_file_change_request_approval_response(
798854
item_id,
799855
changes,
800856
status,
857+
event_id.clone(),
801858
outgoing.as_ref(),
802859
&turn_summary_store,
803860
)
@@ -818,6 +875,7 @@ async fn on_file_change_request_approval_response(
818875
#[allow(clippy::too_many_arguments)]
819876
async fn on_command_execution_request_approval_response(
820877
event_id: String,
878+
conversation_id: ConversationId,
821879
item_id: String,
822880
command: String,
823881
cwd: PathBuf,
@@ -867,6 +925,8 @@ async fn on_command_execution_request_approval_response(
867925

868926
if let Some(status) = completion_status {
869927
complete_command_execution_item(
928+
conversation_id,
929+
event_id.clone(),
870930
item_id.clone(),
871931
command.clone(),
872932
cwd.clone(),
@@ -891,6 +951,8 @@ async fn on_command_execution_request_approval_response(
891951
/// similar to handle_mcp_tool_call_begin in exec
892952
async fn construct_mcp_tool_call_notification(
893953
begin_event: McpToolCallBeginEvent,
954+
thread_id: String,
955+
turn_id: String,
894956
) -> ItemStartedNotification {
895957
let item = ThreadItem::McpToolCall {
896958
id: begin_event.call_id,
@@ -901,12 +963,18 @@ async fn construct_mcp_tool_call_notification(
901963
result: None,
902964
error: None,
903965
};
904-
ItemStartedNotification { item }
966+
ItemStartedNotification {
967+
thread_id,
968+
turn_id,
969+
item,
970+
}
905971
}
906972

907973
/// simiilar to handle_mcp_tool_call_end in exec
908974
async fn construct_mcp_tool_call_end_notification(
909975
end_event: McpToolCallEndEvent,
976+
thread_id: String,
977+
turn_id: String,
910978
) -> ItemCompletedNotification {
911979
let status = if end_event.is_success() {
912980
McpToolCallStatus::Completed
@@ -939,7 +1007,11 @@ async fn construct_mcp_tool_call_end_notification(
9391007
result,
9401008
error,
9411009
};
942-
ItemCompletedNotification { item }
1010+
ItemCompletedNotification {
1011+
thread_id,
1012+
turn_id,
1013+
item,
1014+
}
9431015
}
9441016

9451017
#[cfg(test)]
@@ -1122,9 +1194,18 @@ mod tests {
11221194
},
11231195
};
11241196

1125-
let notification = construct_mcp_tool_call_notification(begin_event.clone()).await;
1197+
let thread_id = ConversationId::new().to_string();
1198+
let turn_id = "turn_1".to_string();
1199+
let notification = construct_mcp_tool_call_notification(
1200+
begin_event.clone(),
1201+
thread_id.clone(),
1202+
turn_id.clone(),
1203+
)
1204+
.await;
11261205

11271206
let expected = ItemStartedNotification {
1207+
thread_id,
1208+
turn_id,
11281209
item: ThreadItem::McpToolCall {
11291210
id: begin_event.call_id,
11301211
server: begin_event.invocation.server,
@@ -1267,9 +1348,18 @@ mod tests {
12671348
},
12681349
};
12691350

1270-
let notification = construct_mcp_tool_call_notification(begin_event.clone()).await;
1351+
let thread_id = ConversationId::new().to_string();
1352+
let turn_id = "turn_2".to_string();
1353+
let notification = construct_mcp_tool_call_notification(
1354+
begin_event.clone(),
1355+
thread_id.clone(),
1356+
turn_id.clone(),
1357+
)
1358+
.await;
12711359

12721360
let expected = ItemStartedNotification {
1361+
thread_id,
1362+
turn_id,
12731363
item: ThreadItem::McpToolCall {
12741364
id: begin_event.call_id,
12751365
server: begin_event.invocation.server,
@@ -1308,9 +1398,18 @@ mod tests {
13081398
result: Ok(result),
13091399
};
13101400

1311-
let notification = construct_mcp_tool_call_end_notification(end_event.clone()).await;
1401+
let thread_id = ConversationId::new().to_string();
1402+
let turn_id = "turn_3".to_string();
1403+
let notification = construct_mcp_tool_call_end_notification(
1404+
end_event.clone(),
1405+
thread_id.clone(),
1406+
turn_id.clone(),
1407+
)
1408+
.await;
13121409

13131410
let expected = ItemCompletedNotification {
1411+
thread_id,
1412+
turn_id,
13141413
item: ThreadItem::McpToolCall {
13151414
id: end_event.call_id,
13161415
server: end_event.invocation.server,
@@ -1341,9 +1440,18 @@ mod tests {
13411440
result: Err("boom".to_string()),
13421441
};
13431442

1344-
let notification = construct_mcp_tool_call_end_notification(end_event.clone()).await;
1443+
let thread_id = ConversationId::new().to_string();
1444+
let turn_id = "turn_4".to_string();
1445+
let notification = construct_mcp_tool_call_end_notification(
1446+
end_event.clone(),
1447+
thread_id.clone(),
1448+
turn_id.clone(),
1449+
)
1450+
.await;
13451451

13461452
let expected = ItemCompletedNotification {
1453+
thread_id,
1454+
turn_id,
13471455
item: ThreadItem::McpToolCall {
13481456
id: end_event.call_id,
13491457
server: end_event.invocation.server,

0 commit comments

Comments
 (0)