File tree Expand file tree Collapse file tree 3 files changed +28
-509
lines changed Expand file tree Collapse file tree 3 files changed +28
-509
lines changed Original file line number Diff line number Diff line change @@ -64,7 +64,7 @@ pub async fn init_conn(
6464 }
6565 } ;
6666
67- let packet = versioned:: ToServer :: deserialize ( & buf, protocol_version)
67+ let init_packet = versioned:: ToServer :: deserialize ( & buf, protocol_version)
6868 . map_err ( |err| WsError :: InvalidPacket ( err. to_string ( ) ) . build ( ) )
6969 . context ( "failed to deserialize initial packet from client" ) ?;
7070
@@ -74,7 +74,7 @@ pub async fn init_conn(
7474 version,
7575 total_slots,
7676 ..
77- } ) = & packet
77+ } ) = & init_packet
7878 {
7979 // Look up existing runner by key
8080 let existing_runner = ctx
@@ -149,12 +149,12 @@ pub async fn init_conn(
149149
150150 ( name. clone ( ) , runner_id, workflow_id)
151151 } else {
152- tracing:: debug!( ?packet , "invalid initial packet" ) ;
152+ tracing:: debug!( ?init_packet , "invalid initial packet" ) ;
153153 return Err ( WsError :: InvalidInitialPacket ( "must be `ToServer::Init`" ) . build ( ) ) ;
154154 } ;
155155
156156 // Forward to runner wf
157- ctx. signal ( pegboard:: workflows:: runner2:: Forward { inner : packet } )
157+ ctx. signal ( pegboard:: workflows:: runner2:: Forward { inner : init_packet } )
158158 . to_workflow_id ( workflow_id)
159159 . send ( )
160160 . await
Original file line number Diff line number Diff line change @@ -343,9 +343,25 @@ async fn handle_message(
343343 . await
344344 . context ( "failed to handle tunnel message" ) ?;
345345 }
346+ // Forward to actor wf
347+ protocol:: ToServer :: ToServerEvents ( events) => {
348+ let res = ctx. signal ( pegboard:: workflows:: runner2:: Forward {
349+ inner : protocol:: ToServer :: try_from ( msg)
350+ . context ( "failed to convert message for workflow forwarding" ) ?,
351+ } )
352+ . tag ( "actor_id" , actor_id)
353+ . graceful_not_found ( )
354+ . send ( )
355+ . await
356+ . with_context ( || {
357+ format ! ( "failed to forward signal to actor workflow: {}" , actor_id)
358+ } ) ?;
359+ if res. is_none ( ) {
360+ tracing:: warn!( ?actor_id, "failed to send signal to actor workflow, likely already stopped" ) ;
361+ }
362+ }
346363 // Forward to runner wf
347364 protocol:: ToServer :: ToServerInit ( _)
348- | protocol:: ToServer :: ToServerEvents ( _)
349365 | protocol:: ToServer :: ToServerAckCommands ( _)
350366 | protocol:: ToServer :: ToServerStopping => {
351367 ctx. signal ( pegboard:: workflows:: runner2:: Forward {
You can’t perform that action at this time.
0 commit comments