File tree Expand file tree Collapse file tree 3 files changed +28
-509
lines changed Expand file tree Collapse file tree 3 files changed +28
-509
lines changed Original file line number Diff line number Diff line change @@ -76,7 +76,7 @@ pub async fn init_conn(
7676 }
7777 } ;
7878
79- let packet = versioned:: ToServer :: deserialize ( & buf, protocol_version)
79+ let init_packet = versioned:: ToServer :: deserialize ( & buf, protocol_version)
8080 . map_err ( |err| WsError :: InvalidPacket ( err. to_string ( ) ) . build ( ) )
8181 . context ( "failed to deserialize initial packet from client" ) ?;
8282
@@ -86,7 +86,7 @@ pub async fn init_conn(
8686 version,
8787 total_slots,
8888 ..
89- } ) = & packet
89+ } ) = & init_packet
9090 {
9191 // Look up existing runner by key
9292 let existing_runner = ctx
@@ -161,12 +161,12 @@ pub async fn init_conn(
161161
162162 ( name. clone ( ) , runner_id, workflow_id)
163163 } else {
164- tracing:: debug!( ?packet , "invalid initial packet" ) ;
164+ tracing:: debug!( ?init_packet , "invalid initial packet" ) ;
165165 return Err ( WsError :: InvalidInitialPacket ( "must be `ToServer::Init`" ) . build ( ) ) ;
166166 } ;
167167
168168 // Forward to runner wf
169- ctx. signal ( pegboard:: workflows:: runner2:: Forward { inner : packet } )
169+ ctx. signal ( pegboard:: workflows:: runner2:: Forward { inner : init_packet } )
170170 . to_workflow_id ( workflow_id)
171171 . send ( )
172172 . await
Original file line number Diff line number Diff line change @@ -337,9 +337,25 @@ async fn handle_message(
337337 . await
338338 . context ( "failed to handle tunnel message" ) ?;
339339 }
340+ // Forward to actor wf
341+ protocol:: ToServer :: ToServerEvents ( events) => {
342+ let res = ctx. signal ( pegboard:: workflows:: runner2:: Forward {
343+ inner : protocol:: ToServer :: try_from ( msg)
344+ . context ( "failed to convert message for workflow forwarding" ) ?,
345+ } )
346+ . tag ( "actor_id" , actor_id)
347+ . graceful_not_found ( )
348+ . send ( )
349+ . await
350+ . with_context ( || {
351+ format ! ( "failed to forward signal to actor workflow: {}" , actor_id)
352+ } ) ?;
353+ if res. is_none ( ) {
354+ tracing:: warn!( ?actor_id, "failed to send signal to actor workflow, likely already stopped" ) ;
355+ }
356+ }
340357 // Forward to runner wf
341358 protocol:: ToServer :: ToServerInit ( _)
342- | protocol:: ToServer :: ToServerEvents ( _)
343359 | protocol:: ToServer :: ToServerAckCommands ( _)
344360 | protocol:: ToServer :: ToServerStopping => {
345361 ctx. signal ( pegboard:: workflows:: runner2:: Forward {
You can’t perform that action at this time.
0 commit comments