@@ -4,14 +4,14 @@ use crate::{
44 types:: { invoke_request_id, IntoFunctionResponse , LambdaEvent } ,
55 Config , Context , Diagnostic ,
66} ;
7- use futures:: { future :: BoxFuture , stream:: FuturesUnordered } ;
7+ use futures:: stream:: FuturesUnordered ;
88use http_body_util:: BodyExt ;
99use lambda_runtime_api_client:: { BoxError , Client as ApiClient } ;
1010use serde:: { Deserialize , Serialize } ;
1111use std:: { env, fmt, fmt:: Debug , future:: Future , io, sync:: Arc } ;
1212use tokio_stream:: { Stream , StreamExt } ;
1313use tower:: { Layer , Service , ServiceExt } ;
14- use tracing:: { error, info_span, trace, warn, Instrument } ;
14+ use tracing:: { debug , error, info_span, trace, warn, Instrument } ;
1515
1616/* ----------------------------------------- INVOCATION ---------------------------------------- */
1717
@@ -164,6 +164,9 @@ where
164164 trace ! ( "Concurrent mode: _X_AMZN_TRACE_ID is not set; use context.xray_trace_id" ) ;
165165 Self :: run_concurrent_inner ( self . service , self . config , self . client , self . concurrency_limit ) . await
166166 } else {
167+ debug ! (
168+ "Concurrent polling disabled (AWS_LAMBDA_MAX_CONCURRENCY unset or <= 1); falling back to sequential polling"
169+ ) ;
167170 let incoming = incoming ( & self . client ) ;
168171 Self :: run_with_incoming ( self . service , self . config , incoming) . await
169172 }
@@ -180,11 +183,14 @@ where
180183
181184 // Use FuturesUnordered so we can observe worker exits as they happen,
182185 // rather than waiting for all workers to finish (join_all).
183- let mut workers: FuturesUnordered < WorkerJoinFuture > = FuturesUnordered :: new ( ) ;
184- let spawn_worker = |service : S , config : Arc < Config > , client : Arc < ApiClient > | -> WorkerJoinFuture {
185- let handle = tokio:: spawn ( concurrent_worker_loop ( service, config, client) ) ;
186- let task_id = handle. id ( ) ;
187- Box :: pin ( async move { ( task_id, handle. await ) } )
186+ let mut workers: FuturesUnordered < tokio:: task:: JoinHandle < ( tokio:: task:: Id , Result < ( ) , BoxError > ) > > =
187+ FuturesUnordered :: new ( ) ;
188+ let spawn_worker = |service : S , config : Arc < Config > , client : Arc < ApiClient > | {
189+ tokio:: spawn ( async move {
190+ let task_id = tokio:: task:: id ( ) ;
191+ let result = concurrent_worker_loop ( service, config, client) . await ;
192+ ( task_id, result)
193+ } )
188194 } ;
189195 // Spawn one worker per concurrency slot; the last uses the owned service to avoid an extra clone.
190196 for _ in 1 ..limit {
@@ -203,22 +209,20 @@ where
203209 // API client failures, runtime panics, etc.
204210 let mut errors: Vec < WorkerError > = Vec :: new ( ) ;
205211 let mut remaining_workers = limit;
206- while let Some ( ( task_id , result) ) = futures:: StreamExt :: next ( & mut workers) . await {
212+ while let Some ( result) = futures:: StreamExt :: next ( & mut workers) . await {
207213 remaining_workers = remaining_workers. saturating_sub ( 1 ) ;
208214 match result {
209- Ok ( Ok ( ( ) ) ) => {
215+ Ok ( ( task_id , Ok ( ( ) ) ) ) => {
210216 // `concurrent_worker_loop` runs indefinitely, so an Ok return indicates
211217 // an unexpected worker exit; we still decrement because the task is gone.
212- let clean_exit_msg = "Concurrent worker exited cleanly (unexpected - loop should run forever)" ;
213218 error ! (
214219 task_id = %task_id,
215220 remaining_workers,
216- "{}" ,
217- clean_exit_msg
221+ "Concurrent worker exited cleanly (unexpected - loop should run forever)"
218222 ) ;
219223 errors. push ( WorkerError :: CleanExit ( task_id) ) ;
220224 }
221- Ok ( Err ( err) ) => {
225+ Ok ( ( task_id , Err ( err) ) ) => {
222226 error ! (
223227 task_id = %task_id,
224228 error = %err,
@@ -228,6 +232,7 @@ where
228232 errors. push ( WorkerError :: Failure ( task_id, err) ) ;
229233 }
230234 Err ( join_err) => {
235+ let task_id = join_err. id ( ) ;
231236 let err: BoxError = Box :: new ( join_err) ;
232237 error ! (
233238 task_id = %task_id,
@@ -253,14 +258,26 @@ enum WorkerError {
253258 Failure ( tokio:: task:: Id , BoxError ) ,
254259}
255260
256- type WorkerJoinResult = ( tokio:: task:: Id , Result < Result < ( ) , BoxError > , tokio:: task:: JoinError > ) ;
257- type WorkerJoinFuture = BoxFuture < ' static , WorkerJoinResult > ;
258-
259261#[ derive( Debug ) ]
260262struct ConcurrentWorkerErrors {
261263 errors : Vec < WorkerError > ,
262264}
263265
266+ #[ derive( Serialize ) ]
267+ struct ConcurrentWorkerErrorsPayload < ' a > {
268+ message : & ' a str ,
269+ #[ serde( skip_serializing_if = "Vec::is_empty" ) ]
270+ clean : Vec < String > ,
271+ #[ serde( skip_serializing_if = "Vec::is_empty" ) ]
272+ failures : Vec < WorkerFailurePayload > ,
273+ }
274+
275+ #[ derive( Serialize ) ]
276+ struct WorkerFailurePayload {
277+ id : String ,
278+ err : String ,
279+ }
280+
264281impl fmt:: Display for ConcurrentWorkerErrors {
265282 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
266283 let mut clean = Vec :: new ( ) ;
@@ -272,31 +289,28 @@ impl fmt::Display for ConcurrentWorkerErrors {
272289 }
273290 }
274291
275- if failures. is_empty ( ) && !clean. is_empty ( ) {
276- write ! (
277- f,
278- "all concurrent workers exited cleanly (unexpected - loop should run forever)"
279- ) ?;
280- for task_id in clean {
281- write ! ( f, " [task {task_id}]" ) ?;
282- }
283- return Ok ( ( ) ) ;
284- }
292+ let clean_ids: Vec < String > = clean. iter ( ) . map ( |task_id| task_id. to_string ( ) ) . collect ( ) ;
293+ let failure_entries: Vec < WorkerFailurePayload > = failures
294+ . iter ( )
295+ . map ( |( task_id, err) | WorkerFailurePayload {
296+ id : task_id. to_string ( ) ,
297+ err : err. to_string ( ) ,
298+ } )
299+ . collect ( ) ;
285300
286- write ! ( f, "concurrent workers exited unexpectedly" ) ?;
287- if !clean. is_empty ( ) {
288- write ! ( f, "; clean exits:" ) ?;
289- for task_id in clean {
290- write ! ( f, " [task {task_id}]" ) ?;
291- }
292- }
293- if !failures. is_empty ( ) {
294- write ! ( f, "; failures:" ) ?;
295- for ( task_id, err) in failures {
296- write ! ( f, " [task {task_id}] {err}" ) ?;
297- }
298- }
299- Ok ( ( ) )
301+ let message = if failures. is_empty ( ) && !clean. is_empty ( ) {
302+ "all concurrent workers exited cleanly (unexpected - loop should run forever)"
303+ } else {
304+ "concurrent workers exited unexpectedly"
305+ } ;
306+
307+ let payload = ConcurrentWorkerErrorsPayload {
308+ message,
309+ clean : clean_ids,
310+ failures : failure_entries,
311+ } ;
312+ let json = serde_json:: to_string ( & payload) . map_err ( |_| fmt:: Error ) ?;
313+ write ! ( f, "{json}" )
300314 }
301315}
302316
@@ -384,7 +398,7 @@ fn incoming(
384398}
385399
386400/// Creates a future that polls the `/next` endpoint.
387- async fn next_event_future ( client : Arc < ApiClient > ) -> Result < http:: Response < hyper:: body:: Incoming > , BoxError > {
401+ async fn next_event_future ( client : & ApiClient ) -> Result < http:: Response < hyper:: body:: Incoming > , BoxError > {
388402 let req = NextEventRequest . into_req ( ) ?;
389403 client. call ( req) . await
390404}
@@ -406,9 +420,9 @@ where
406420 S :: Future : Send ,
407421{
408422 let task_id = tokio:: task:: id ( ) ;
409- let span = info_span ! ( "concurrent_worker_loop " , task_id = %task_id) ;
423+ let span = info_span ! ( "worker " , task_id = %task_id) ;
410424 loop {
411- let event = match next_event_future ( client. clone ( ) ) . await {
425+ let event = match next_event_future ( client. as_ref ( ) ) . instrument ( span . clone ( ) ) . await {
412426 Ok ( event) => event,
413427 Err ( e) => {
414428 warn ! ( task_id = %task_id, error = %e, "Error polling /next, retrying" ) ;
0 commit comments