@@ -16,9 +16,13 @@ import Logging
1616import NIOCore
1717import NIOHTTP1
1818import NIOPosix
19+ import Synchronization
1920
2021@usableFromInline
2122final actor LambdaRuntimeClient : LambdaRuntimeClientProtocol {
23+ @usableFromInline
24+ var _hasStreamingCustomHeaders = false
25+
2226 @usableFromInline
2327 nonisolated let unownedExecutor : UnownedSerialExecutor
2428
@@ -42,6 +46,11 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
4246 self . runtimeClient = runtimeClient
4347 }
4448
49+ @usableFromInline
50+ func writeCustomHeader( _ buffer: NIOCore . ByteBuffer ) async throws {
51+ try await self . runtimeClient. writeCustomHeader ( buffer)
52+ }
53+
4554 @usableFromInline
4655 func write( _ buffer: NIOCore . ByteBuffer ) async throws {
4756 try await self . runtimeClient. write ( buffer)
@@ -188,6 +197,10 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
188197 }
189198 }
190199
200+ private func writeCustomHeader( _ buffer: NIOCore . ByteBuffer ) async throws {
201+ _hasStreamingCustomHeaders = true
202+ try await self . write ( buffer)
203+ }
191204 private func write( _ buffer: NIOCore . ByteBuffer ) async throws {
192205 switch self . lambdaState {
193206 case . idle, . sentResponse:
@@ -210,6 +223,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
210223 }
211224
212225 private func writeAndFinish( _ buffer: NIOCore . ByteBuffer ? ) async throws {
226+ _hasStreamingCustomHeaders = false
213227 switch self . lambdaState {
214228 case . idle, . sentResponse:
215229 throw LambdaRuntimeError ( code: . finishAfterFinishHasBeenSent)
@@ -330,7 +344,11 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
330344 NIOHTTPClientResponseAggregator ( maxContentLength: 6 * 1024 * 1024 )
331345 )
332346 try channel. pipeline. syncOperations. addHandler (
333- LambdaChannelHandler ( delegate: self , logger: self . logger, configuration: self . configuration)
347+ LambdaChannelHandler (
348+ delegate: self ,
349+ logger: self . logger,
350+ configuration: self . configuration
351+ )
334352 )
335353 return channel. eventLoop. makeSucceededFuture ( ( ) )
336354 } catch {
@@ -425,13 +443,17 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate {
425443
426444 }
427445 }
446+ }
428447
448+ func hasStreamingCustomHeaders( isolation: isolated ( any Actor ) ? = #isolation) async -> Bool {
449+ await self . _hasStreamingCustomHeaders
429450 }
430451}
431452
432453private protocol LambdaChannelHandlerDelegate {
433454 func connectionWillClose( channel: any Channel )
434455 func connectionErrorHappened( _ error: any Error , channel: any Channel )
456+ func hasStreamingCustomHeaders( isolation: isolated ( any Actor ) ? ) async -> Bool
435457}
436458
437459private final class LambdaChannelHandler < Delegate: LambdaChannelHandlerDelegate > {
@@ -467,10 +489,16 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
467489 let defaultHeaders : HTTPHeaders
468490 /// These headers must be sent along an invocation or initialization error report
469491 let errorHeaders : HTTPHeaders
470- /// These headers must be sent when streaming a response
492+ /// These headers must be sent when streaming a large response
493+ let largeResponseHeaders : HTTPHeaders
494+ /// These headers must be sent when the handler streams its response
471495 let streamingHeaders : HTTPHeaders
472496
473- init ( delegate: Delegate , logger: Logger , configuration: LambdaRuntimeClient . Configuration ) {
497+ init (
498+ delegate: Delegate ,
499+ logger: Logger ,
500+ configuration: LambdaRuntimeClient . Configuration
501+ ) {
474502 self . delegate = delegate
475503 self . logger = logger
476504 self . configuration = configuration
@@ -483,11 +511,23 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
483511 " user-agent " : . userAgent,
484512 " lambda-runtime-function-error-type " : " Unhandled " ,
485513 ]
486- self . streamingHeaders = [
514+ self . largeResponseHeaders = [
487515 " host " : " \( self . configuration. ip) : \( self . configuration. port) " ,
488516 " user-agent " : . userAgent,
489517 " transfer-encoding " : " chunked " ,
490518 ]
519+ // https://docs.aws.amazon.com/lambda/latest/dg/runtimes-custom.html#runtimes-custom-response-streaming
520+ // These are the headers returned by the Runtime to the Lambda Data plane.
521+ // These are not the headers the Lambda Data plane sends to the caller of the Lambda function
522+ // The developer of the function can set the caller's headers in the handler code.
523+ self . streamingHeaders = [
524+ " host " : " \( self . configuration. ip) : \( self . configuration. port) " ,
525+ " user-agent " : . userAgent,
526+ " Lambda-Runtime-Function-Response-Mode " : " streaming " ,
527+ // these are not used by this runtime client at the moment
528+ // FIXME: the eror handling should inject these headers in the streamed response to report mid-stream errors
529+ " Trailer " : " Lambda-Runtime-Function-Error-Type, Lambda-Runtime-Function-Error-Body " ,
530+ ]
491531 }
492532
493533 func nextInvocation( isolation: isolated ( any Actor ) ? = #isolation) async throws -> Invocation {
@@ -625,11 +665,16 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
625665 // TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length
626666 let url = Consts . invocationURLPrefix + " / " + requestID + Consts. postResponseURLSuffix
627667
668+ var headers = self . streamingHeaders
669+ if await self . delegate. hasStreamingCustomHeaders ( isolation: #isolation) {
670+ // this headers is required by Function URL when the user sends custom status code or headers
671+ headers. add ( name: " Content-Type " , value: " application/vnd.awslambda.http-integration-response " )
672+ }
628673 let httpRequest = HTTPRequestHead (
629674 version: . http1_1,
630675 method: . POST,
631676 uri: url,
632- headers: self . streamingHeaders
677+ headers: headers
633678 )
634679
635680 context. write ( self . wrapOutboundOut ( . head( httpRequest) ) , promise: nil )
@@ -652,17 +697,13 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
652697
653698 // If we have less than 6MB, we don't want to use the streaming API. If we have more
654699 // than 6MB we must use the streaming mode.
655- let headers : HTTPHeaders =
656- if byteBuffer? . readableBytes ?? 0 < 6_000_000 {
657- [
658- " host " : " \( self . configuration. ip) : \( self . configuration. port) " ,
659- " user-agent " : . userAgent,
660- " content-length " : " \( byteBuffer? . readableBytes ?? 0 ) " ,
661- ]
662- } else {
663- self . streamingHeaders
664- }
665-
700+ var headers : HTTPHeaders !
701+ if byteBuffer? . readableBytes ?? 0 < 6_000_000 {
702+ headers = self . defaultHeaders
703+ headers. add ( name: " content-length " , value: " \( byteBuffer? . readableBytes ?? 0 ) " )
704+ } else {
705+ headers = self . largeResponseHeaders
706+ }
666707 let httpRequest = HTTPRequestHead (
667708 version: . http1_1,
668709 method: . POST,
0 commit comments