From a500f39196746d65b4ae9e72556d1388bbebdad3 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Wed, 12 Nov 2025 08:28:07 +0100 Subject: [PATCH 1/6] add support for nonisolated(nonsending) by default --- Package.swift | 4 +- ...RuntimeClient+ChannelHandler-pre-6.2.swift | 491 ++++++++++++++++++ .../LambdaRuntimeClient+ChannelHandler.swift | 9 +- 3 files changed, 497 insertions(+), 7 deletions(-) create mode 100644 Sources/AWSLambdaRuntime/LambdaRuntimeClient+ChannelHandler-pre-6.2.swift diff --git a/Package.swift b/Package.swift index 99ea9667..4cce3abd 100644 --- a/Package.swift +++ b/Package.swift @@ -6,7 +6,9 @@ let defaultSwiftSettings: [SwiftSetting] = [ .enableExperimentalFeature( "AvailabilityMacro=LambdaSwift 2.0:macOS 15.0" - ) + ), + .enableUpcomingFeature("NonisolatedNonsendingByDefault"), + .enableUpcomingFeature("InferIsolatedConformances"), ] let package = Package( diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient+ChannelHandler-pre-6.2.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient+ChannelHandler-pre-6.2.swift new file mode 100644 index 00000000..be0973f2 --- /dev/null +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient+ChannelHandler-pre-6.2.swift @@ -0,0 +1,491 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright SwiftAWSLambdaRuntime project authors +// Copyright (c) Amazon.com, Inc. or its affiliates. +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if swift(<6.2) +import Logging +import NIOCore +import NIOHTTP1 +import NIOPosix + +internal protocol LambdaChannelHandlerDelegate { + func connectionWillClose(channel: any Channel) + func connectionErrorHappened(_ error: any Error, channel: any Channel) +} + +@available(LambdaSwift 2.0, *) +internal final class LambdaChannelHandler { + let nextInvocationPath = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix + + enum State { + case disconnected + case connected(ChannelHandlerContext, LambdaState) + case closing + + enum LambdaState { + /// this is the "normal" state. Transitions to `waitingForNextInvocation` + case idle + /// this is the state while we wait for an invocation. A next call is running. + /// Transitions to `waitingForResponse` + case waitingForNextInvocation(CheckedContinuation) + /// The invocation was forwarded to the handler and we wait for a response. + /// Transitions to `sendingResponse` or `sentResponse`. + case waitingForResponse + case sendingResponse + case sentResponse(CheckedContinuation) + } + } + + private var state: State = .disconnected + private var lastError: Error? + private var reusableErrorBuffer: ByteBuffer? + private let logger: Logger + private let delegate: Delegate + private let configuration: LambdaRuntimeClient.Configuration + + /// These are the default headers that must be sent along an invocation + let defaultHeaders: HTTPHeaders + /// These headers must be sent along an invocation or initialization error report + let errorHeaders: HTTPHeaders + /// These headers must be sent when streaming a large response + let largeResponseHeaders: HTTPHeaders + /// These headers must be sent when the handler streams its response + let streamingHeaders: HTTPHeaders + + init( + delegate: Delegate, + logger: Logger, + configuration: LambdaRuntimeClient.Configuration + ) { + self.delegate = delegate + self.logger = logger + self.configuration = configuration + self.defaultHeaders = [ + "host": "\(self.configuration.ip):\(self.configuration.port)", + "user-agent": .userAgent, + ] + self.errorHeaders = [ + "host": "\(self.configuration.ip):\(self.configuration.port)", + "user-agent": .userAgent, + "lambda-runtime-function-error-type": "Unhandled", + ] + self.largeResponseHeaders = [ + "host": "\(self.configuration.ip):\(self.configuration.port)", + "user-agent": .userAgent, + "transfer-encoding": "chunked", + ] + // https://docs.aws.amazon.com/lambda/latest/dg/runtimes-custom.html#runtimes-custom-response-streaming + // These are the headers returned by the Runtime to the Lambda Data plane. + // These are not the headers the Lambda Data plane sends to the caller of the Lambda function + // The developer of the function can set the caller's headers in the handler code. + self.streamingHeaders = [ + "host": "\(self.configuration.ip):\(self.configuration.port)", + "user-agent": .userAgent, + "Lambda-Runtime-Function-Response-Mode": "streaming", + // these are not used by this runtime client at the moment + // FIXME: the eror handling should inject these headers in the streamed response to report mid-stream errors + "Trailer": "Lambda-Runtime-Function-Error-Type, Lambda-Runtime-Function-Error-Body", + ] + } + + func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation { + switch self.state { + case .connected(let context, .idle): + return try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation) in + self.state = .connected(context, .waitingForNextInvocation(continuation)) + self.sendNextRequest(context: context) + } + + case .connected(_, .sendingResponse), + .connected(_, .sentResponse), + .connected(_, .waitingForNextInvocation), + .connected(_, .waitingForResponse), + .closing: + fatalError("Invalid state: \(self.state)") + + case .disconnected: + throw LambdaRuntimeError(code: .connectionToControlPlaneLost) + } + } + + func reportError( + isolation: isolated (any Actor)? = #isolation, + _ error: any Error, + requestID: String + ) async throws { + switch self.state { + case .connected(_, .waitingForNextInvocation): + fatalError("Invalid state: \(self.state)") + + case .connected(let context, .waitingForResponse): + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + self.state = .connected(context, .sentResponse(continuation)) + self.sendReportErrorRequest(requestID: requestID, error: error, context: context) + } + + case .connected(let context, .sendingResponse): + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + self.state = .connected(context, .sentResponse(continuation)) + self.sendResponseStreamingFailure(error: error, context: context) + } + + case .connected(_, .idle), + .connected(_, .sentResponse): + // The final response has already been sent. The only way to report the unhandled error + // now is to log it. Normally this library never logs higher than debug, we make an + // exception here, as there is no other way of reporting the error. + self.logger.error( + "Unhandled error after stream has finished", + metadata: [ + "lambda_request_id": "\(requestID)", + "lambda_error": "\(String(describing: error))", + ] + ) + + case .disconnected: + throw LambdaRuntimeError(code: .connectionToControlPlaneLost) + + case .closing: + throw LambdaRuntimeError(code: .connectionToControlPlaneGoingAway) + } + } + + func writeResponseBodyPart( + isolation: isolated (any Actor)? = #isolation, + _ byteBuffer: ByteBuffer, + requestID: String, + hasCustomHeaders: Bool + ) async throws { + switch self.state { + case .connected(_, .waitingForNextInvocation): + fatalError("Invalid state: \(self.state)") + + case .connected(let context, .waitingForResponse): + self.state = .connected(context, .sendingResponse) + try await self.sendResponseBodyPart( + byteBuffer, + sendHeadWithRequestID: requestID, + context: context, + hasCustomHeaders: hasCustomHeaders + ) + + case .connected(let context, .sendingResponse): + + precondition(!hasCustomHeaders, "Programming error: Custom headers should not be sent in this state") + + try await self.sendResponseBodyPart( + byteBuffer, + sendHeadWithRequestID: nil, + context: context, + hasCustomHeaders: hasCustomHeaders + ) + + case .connected(_, .idle), + .connected(_, .sentResponse): + throw LambdaRuntimeError(code: .writeAfterFinishHasBeenSent) + + case .disconnected: + throw LambdaRuntimeError(code: .connectionToControlPlaneLost) + + case .closing: + throw LambdaRuntimeError(code: .connectionToControlPlaneGoingAway) + } + } + + func finishResponseRequest( + isolation: isolated (any Actor)? = #isolation, + finalData: ByteBuffer?, + requestID: String + ) async throws { + switch self.state { + case .connected(_, .idle), + .connected(_, .waitingForNextInvocation): + fatalError("Invalid state: \(self.state)") + + case .connected(let context, .waitingForResponse): + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + self.state = .connected(context, .sentResponse(continuation)) + self.sendResponseFinish(finalData, sendHeadWithRequestID: requestID, context: context) + } + + case .connected(let context, .sendingResponse): + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + self.state = .connected(context, .sentResponse(continuation)) + self.sendResponseFinish(finalData, sendHeadWithRequestID: nil, context: context) + } + + case .connected(_, .sentResponse): + throw LambdaRuntimeError(code: .finishAfterFinishHasBeenSent) + + case .disconnected: + throw LambdaRuntimeError(code: .connectionToControlPlaneLost) + + case .closing: + throw LambdaRuntimeError(code: .connectionToControlPlaneGoingAway) + } + } + + private func sendResponseBodyPart( + isolation: isolated (any Actor)? = #isolation, + _ byteBuffer: ByteBuffer, + sendHeadWithRequestID: String?, + context: ChannelHandlerContext, + hasCustomHeaders: Bool + ) async throws { + + if let requestID = sendHeadWithRequestID { + // TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length. + let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postResponseURLSuffix + + var headers = self.streamingHeaders + if hasCustomHeaders { + // this header is required by Function URL when the user sends custom status code or headers + headers.add(name: "Content-Type", value: "application/vnd.awslambda.http-integration-response") + } + let httpRequest = HTTPRequestHead( + version: .http1_1, + method: .POST, + uri: url, + headers: headers + ) + + context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) + } + + let future = context.write(self.wrapOutboundOut(.body(.byteBuffer(byteBuffer)))) + context.flush() + try await future.get() + } + + private func sendResponseFinish( + isolation: isolated (any Actor)? = #isolation, + _ byteBuffer: ByteBuffer?, + sendHeadWithRequestID: String?, + context: ChannelHandlerContext + ) { + if let requestID = sendHeadWithRequestID { + // TODO: This feels quite expensive. We should be able to make this cheaper. requestIDs are fixed length. + let url = "\(Consts.invocationURLPrefix)/\(requestID)\(Consts.postResponseURLSuffix)" + + // If we have less than 6MB, we don't want to use the streaming API. If we have more + // than 6MB, we must use the streaming mode. + var headers: HTTPHeaders! + if byteBuffer?.readableBytes ?? 0 < 6_000_000 { + headers = self.defaultHeaders + headers.add(name: "content-length", value: "\(byteBuffer?.readableBytes ?? 0)") + } else { + headers = self.largeResponseHeaders + } + let httpRequest = HTTPRequestHead( + version: .http1_1, + method: .POST, + uri: url, + headers: headers + ) + + context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) + } + + if let byteBuffer { + context.write(self.wrapOutboundOut(.body(.byteBuffer(byteBuffer))), promise: nil) + } + + context.write(self.wrapOutboundOut(.end(nil)), promise: nil) + context.flush() + } + + private func sendNextRequest(context: ChannelHandlerContext) { + let httpRequest = HTTPRequestHead( + version: .http1_1, + method: .GET, + uri: self.nextInvocationPath, + headers: self.defaultHeaders + ) + + context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) + context.write(self.wrapOutboundOut(.end(nil)), promise: nil) + context.flush() + } + + private func sendReportErrorRequest(requestID: String, error: any Error, context: ChannelHandlerContext) { + // TODO: This feels quite expensive. We should be able to make this cheaper. requestIDs are fixed length + let url = "\(Consts.invocationURLPrefix)/\(requestID)\(Consts.postErrorURLSuffix)" + + let httpRequest = HTTPRequestHead( + version: .http1_1, + method: .POST, + uri: url, + headers: self.errorHeaders + ) + + if self.reusableErrorBuffer == nil { + self.reusableErrorBuffer = context.channel.allocator.buffer(capacity: 1024) + } else { + self.reusableErrorBuffer!.clear() + } + + let errorResponse = ErrorResponse(errorType: "\(type(of: error))", errorMessage: "\(error)") + // TODO: Write this directly into our ByteBuffer + let bytes = errorResponse.toJSONBytes() + self.reusableErrorBuffer!.writeBytes(bytes) + + context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) + context.write(self.wrapOutboundOut(.body(.byteBuffer(self.reusableErrorBuffer!))), promise: nil) + context.write(self.wrapOutboundOut(.end(nil)), promise: nil) + context.flush() + } + + private func sendResponseStreamingFailure(error: any Error, context: ChannelHandlerContext) { + let trailers: HTTPHeaders = [ + "Lambda-Runtime-Function-Error-Type": "Unhandled", + "Lambda-Runtime-Function-Error-Body": "Requires base64", + ] + + context.write(self.wrapOutboundOut(.end(trailers)), promise: nil) + context.flush() + } + + func cancelCurrentRequestAndCloseConnection() { + fatalError("Unimplemented") + } +} + +@available(LambdaSwift 2.0, *) +extension LambdaChannelHandler: ChannelInboundHandler { + typealias OutboundIn = Never + typealias InboundIn = NIOHTTPClientResponseFull + typealias OutboundOut = HTTPClientRequestPart + + func handlerAdded(context: ChannelHandlerContext) { + if context.channel.isActive { + self.state = .connected(context, .idle) + } + } + + func channelActive(context: ChannelHandlerContext) { + switch self.state { + case .disconnected: + self.state = .connected(context, .idle) + case .connected: + break + case .closing: + fatalError("Invalid state: \(self.state)") + } + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let response = unwrapInboundIn(data) + + // handle response content + + switch self.state { + case .connected(let context, .waitingForNextInvocation(let continuation)): + do { + let metadata = try InvocationMetadata(headers: response.head.headers) + self.state = .connected(context, .waitingForResponse) + continuation.resume(returning: Invocation(metadata: metadata, event: response.body ?? ByteBuffer())) + } catch { + self.state = .closing + + self.delegate.connectionWillClose(channel: context.channel) + context.close(promise: nil) + continuation.resume( + throwing: LambdaRuntimeError(code: .invocationMissingMetadata, underlying: error) + ) + } + + case .connected(let context, .sentResponse(let continuation)): + if response.head.status == .accepted { + self.state = .connected(context, .idle) + continuation.resume() + } else { + self.state = .connected(context, .idle) + continuation.resume(throwing: LambdaRuntimeError(code: .unexpectedStatusCodeForRequest)) + } + + case .disconnected, .closing, .connected(_, _): + break + } + + // As defined in RFC 7230 Section 6.3: + // HTTP/1.1 defaults to the use of "persistent connections", allowing + // multiple requests and responses to be carried over a single + // connection. The "close" connection option is used to signal that a + // connection will not persist after the current request/response. HTTP + // implementations SHOULD support persistent connections. + // + // That's why we only assume the connection shall be closed if we receive + // a "connection = close" header. + let serverCloseConnection = + response.head.headers["connection"].contains(where: { $0.lowercased() == "close" }) + + let closeConnection = serverCloseConnection || response.head.version != .http1_1 + + if closeConnection { + // If we were succeeding the request promise here directly and closing the connection + // after succeeding the promise we may run into a race condition: + // + // The lambda runtime will ask for the next work item directly after a succeeded post + // response request. The desire for the next work item might be faster than the attempt + // to close the connection. This will lead to a situation where we try to the connection + // but the next request has already been scheduled on the connection that we want to + // close. For this reason we postpone succeeding the promise until the connection has + // been closed. This codepath will only be hit in the very, very unlikely event of the + // Lambda control plane demanding to close connection. (It's more or less only + // implemented to support http1.1 correctly.) This behavior is ensured with the test + // `LambdaTest.testNoKeepAliveServer`. + self.state = .closing + self.delegate.connectionWillClose(channel: context.channel) + context.close(promise: nil) + } + } + + func errorCaught(context: ChannelHandlerContext, error: Error) { + self.logger.trace( + "Channel error caught", + metadata: [ + "error": "\(error)" + ] + ) + // pending responses will fail with lastError in channelInactive since we are calling context.close + self.delegate.connectionErrorHappened(error, channel: context.channel) + + self.lastError = error + context.channel.close(promise: nil) + } + + func channelInactive(context: ChannelHandlerContext) { + // fail any pending responses with last error or assume peer disconnected + switch self.state { + case .connected(_, let lambdaState): + switch lambdaState { + case .waitingForNextInvocation(let continuation): + continuation.resume(throwing: self.lastError ?? ChannelError.ioOnClosedChannel) + case .sentResponse(let continuation): + continuation.resume(throwing: self.lastError ?? ChannelError.ioOnClosedChannel) + case .idle, .sendingResponse, .waitingForResponse: + break + } + self.state = .disconnected + default: + break + } + + // we don't need to forward channelInactive to the delegate, as the delegate observes the + // closeFuture + context.fireChannelInactive() + } +} +#endif diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient+ChannelHandler.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient+ChannelHandler.swift index 6238fac4..06fd0c4a 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient+ChannelHandler.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient+ChannelHandler.swift @@ -13,6 +13,7 @@ // //===----------------------------------------------------------------------===// +#if swift(>=6.2) import Logging import NIOCore import NIOHTTP1 @@ -98,7 +99,7 @@ internal final class LambdaChannelHandler Invocation { + func nextInvocation() async throws -> Invocation { switch self.state { case .connected(let context, .idle): return try await withCheckedThrowingContinuation { @@ -120,7 +121,6 @@ internal final class LambdaChannelHandler Date: Thu, 13 Nov 2025 09:26:28 +0100 Subject: [PATCH 2/6] add Sendable to LambdaResponseStreamWriter --- Sources/AWSLambdaRuntime/LambdaHandlers.swift | 2 +- ...bdaResponseStreamWriter+HeadersTests.swift | 88 +++++++++---------- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaHandlers.swift b/Sources/AWSLambdaRuntime/LambdaHandlers.swift index 9ff33121..3716db73 100644 --- a/Sources/AWSLambdaRuntime/LambdaHandlers.swift +++ b/Sources/AWSLambdaRuntime/LambdaHandlers.swift @@ -48,7 +48,7 @@ public protocol StreamingLambdaHandler: _Lambda_SendableMetatype { /// A writer object to write the Lambda response stream into. The HTTP response is started lazily. /// before the first call to ``write(_:)`` or ``writeAndFinish(_:)``. -public protocol LambdaResponseStreamWriter { +public protocol LambdaResponseStreamWriter: Sendable { /// Write a response part into the stream. Bytes written are streamed continually. /// - Parameter buffer: The buffer to write. /// - Parameter hasCustomHeaders: If `true`, the response will be sent with custom HTTP status code and headers. diff --git a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift index c411dd24..ec30ed89 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift @@ -36,10 +36,10 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response) // Verify we have exactly 1 buffer written (single write operation) - #expect(writer.writtenBuffers.count == 1) + #expect(await writer.writtenBuffers.count == 1) // Verify buffer contains valid JSON - let buffer = writer.writtenBuffers[0] + let buffer = await writer.writtenBuffers[0] let content = String(buffer: buffer) #expect(content.contains("\"statusCode\":200")) } @@ -63,10 +63,10 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response) // Verify we have exactly 1 buffer written (single write operation) - #expect(writer.writtenBuffers.count == 1) + #expect(await writer.writtenBuffers.count == 1) // Extract JSON from the buffer - let buffer = writer.writtenBuffers[0] + let buffer = await writer.writtenBuffers[0] let content = String(buffer: buffer) // Verify all expected fields are present in the JSON @@ -97,10 +97,10 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response, encoder: customEncoder) // Verify we have exactly 1 buffer written (single write operation) - #expect(writer.writtenBuffers.count == 1) + #expect(await writer.writtenBuffers.count == 1) // Verify JSON content with sorted keys - let buffer = writer.writtenBuffers[0] + let buffer = await writer.writtenBuffers[0] let content = String(buffer: buffer) // With sorted keys, "headers" should come before "statusCode" @@ -121,10 +121,10 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response) // Verify we have exactly 1 buffer written - #expect(writer.writtenBuffers.count == 1) + #expect(await writer.writtenBuffers.count == 1) // Verify JSON structure - let buffer = writer.writtenBuffers[0] + let buffer = await writer.writtenBuffers[0] let content = String(buffer: buffer) // Check expected fields @@ -149,10 +149,10 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response) // Verify we have exactly 1 buffer written - #expect(writer.writtenBuffers.count == 1) + #expect(await writer.writtenBuffers.count == 1) // Verify JSON structure - let buffer = writer.writtenBuffers[0] + let buffer = await writer.writtenBuffers[0] let content = String(buffer: buffer) // Check expected fields @@ -179,10 +179,10 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response) // Verify we have exactly 1 buffer written - #expect(writer.writtenBuffers.count == 1) + #expect(await writer.writtenBuffers.count == 1) // Extract JSON part from the buffer - let buffer = writer.writtenBuffers[0] + let buffer = await writer.writtenBuffers[0] let content = String(buffer: buffer) // Find the JSON part (everything before any null bytes) @@ -213,10 +213,10 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response) // Verify we have exactly 1 buffer written - #expect(writer.writtenBuffers.count == 1) + #expect(await writer.writtenBuffers.count == 1) // Get the buffer content - let buffer = writer.writtenBuffers[0] + let buffer = await writer.writtenBuffers[0] let content = String(buffer: buffer) // Verify it contains JSON @@ -240,7 +240,7 @@ struct LambdaResponseStreamWriterHeadersTests { } // Verify no data was written when encoding fails - #expect(writer.writtenBuffers.isEmpty) + #expect(await writer.writtenBuffers.isEmpty) } @Test("Write method error propagation") @@ -255,7 +255,7 @@ struct LambdaResponseStreamWriterHeadersTests { } // Verify the writer attempted to write once - #expect(writer.writeCallCount == 1) + #expect(await writer.writeCallCount == 1) } // This test is no longer needed since we only have one write operation now @@ -298,7 +298,7 @@ struct LambdaResponseStreamWriterHeadersTests { } // Verify no data was written when encoding fails - #expect(writer.writtenBuffers.isEmpty) + #expect(await writer.writtenBuffers.isEmpty) } // MARK: - Integration Tests @@ -329,21 +329,21 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeAndFinish(moreBuffer) // Verify the sequence: headers + body + more body - #expect(writer.writtenBuffers.count == 3) - #expect(writer.isFinished == true) + #expect(await writer.writtenBuffers.count == 3) + #expect(await writer.isFinished == true) // Verify headers content - let headersBuffer = writer.writtenBuffers[0] + let headersBuffer = await writer.writtenBuffers[0] let headersContent = String(buffer: headersBuffer) #expect(headersContent.contains("\"statusCode\":200")) #expect(headersContent.contains("\"Content-Type\":\"text/plain\"")) // Verify body content - let firstBodyBuffer = writer.writtenBuffers[1] + let firstBodyBuffer = await writer.writtenBuffers[1] let firstBodyString = String(buffer: firstBodyBuffer) #expect(firstBodyString == "Hello, World!") - let secondBodyBuffer = writer.writtenBuffers[2] + let secondBodyBuffer = await writer.writtenBuffers[2] let secondBodyString = String(buffer: secondBodyBuffer) #expect(secondBodyString == " Additional content.") } @@ -368,16 +368,16 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(secondResponse) // Verify both header writes were successful - #expect(writer.writtenBuffers.count == 2) // One buffer per header write + #expect(await writer.writtenBuffers.count == 2) // One buffer per header write // Verify first header write - let firstBuffer = writer.writtenBuffers[0] + let firstBuffer = await writer.writtenBuffers[0] let firstContent = String(buffer: firstBuffer) #expect(firstContent.contains("\"statusCode\":200")) #expect(firstContent.contains("\"Content-Type\":\"application/json\"")) // Verify second header write - let secondBuffer = writer.writtenBuffers[1] + let secondBuffer = await writer.writtenBuffers[1] let secondContent = String(buffer: secondBuffer) #expect(secondContent.contains("\"statusCode\":201")) #expect(secondContent.contains("\"Location\":\"https://example.com/resource/123\"")) @@ -417,16 +417,16 @@ struct LambdaResponseStreamWriterHeadersTests { } // Verify the complete sequence - #expect(writer.writtenBuffers.count == 5) // 1 header + 4 body chunks - #expect(writer.isFinished == true) + #expect(await writer.writtenBuffers.count == 5) // 1 header + 4 body chunks + #expect(await writer.isFinished == true) // Verify headers were written correctly - let jsonBuffer = writer.writtenBuffers[0] + let jsonBuffer = await writer.writtenBuffers[0] let jsonString = String(buffer: jsonBuffer) #expect(jsonString.contains("\"statusCode\":200")) // Verify body chunks - let bodyChunks = writer.writtenBuffers[1...4].map { String(buffer: $0) } + let bodyChunks = await writer.writtenBuffers[1...4].map { String(buffer: $0) } let completeBody = bodyChunks.joined() let expectedBody = #"{"users": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}"# #expect(completeBody == expectedBody) @@ -440,20 +440,20 @@ struct LambdaResponseStreamWriterHeadersTests { let response = StreamingLambdaStatusAndHeadersResponse(statusCode: 200) try await basicWriter.writeStatusAndHeaders(response) - #expect(basicWriter.writtenBuffers.count == 1) + #expect(await basicWriter.writtenBuffers.count == 1) // Test with a writer that tracks additional state let trackingWriter = TrackingLambdaResponseStreamWriter() try await trackingWriter.writeStatusAndHeaders(response) - #expect(trackingWriter.writtenBuffers.count == 1) - #expect(trackingWriter.writeCallCount == 1) // Single write operation - #expect(trackingWriter.finishCallCount == 0) + #expect(await trackingWriter.writtenBuffers.count == 1) + #expect(await trackingWriter.writeCallCount == 1) // Single write operation + #expect(await trackingWriter.finishCallCount == 0) // Test with a writer that has custom behavior let customWriter = CustomBehaviorLambdaResponseStreamWriter() try await customWriter.writeStatusAndHeaders(response) - #expect(customWriter.writtenBuffers.count == 1) - #expect(customWriter.customBehaviorTriggered == true) + #expect(await customWriter.writtenBuffers.count == 1) + #expect(await customWriter.customBehaviorTriggered == true) } @Test("Integration: complex scenario with headers, streaming, and finish") @@ -495,12 +495,12 @@ struct LambdaResponseStreamWriterHeadersTests { // Verify the complete sequence // 2 header writes + 3 events + 1 final event = 6 buffers - #expect(writer.writtenBuffers.count == 6) - #expect(writer.isFinished == true) + #expect(await writer.writtenBuffers.count == 6) + #expect(await writer.isFinished == true) // Verify events (we know the first two buffers are headers) let eventBuffers = [ - writer.writtenBuffers[2], writer.writtenBuffers[3], writer.writtenBuffers[4], writer.writtenBuffers[5], + await writer.writtenBuffers[2], await writer.writtenBuffers[3], await writer.writtenBuffers[4], await writer.writtenBuffers[5], ] let eventStrings = eventBuffers.map { String(buffer: $0) } #expect(eventStrings[0] == "data: Event 1\n\n") @@ -522,14 +522,14 @@ struct LambdaResponseStreamWriterHeadersTests { // This should compile and work without issues try await testWithGenericWriter(writer) - #expect(writer.writtenBuffers.count == 1) + #expect(await writer.writtenBuffers.count == 1) // Verify it works with protocol existential let protocolWriter: any LambdaResponseStreamWriter = MockLambdaResponseStreamWriter() try await protocolWriter.writeStatusAndHeaders(response) if let mockWriter = protocolWriter as? MockLambdaResponseStreamWriter { - #expect(mockWriter.writtenBuffers.count == 1) + #expect(await mockWriter.writtenBuffers.count == 1) } } } @@ -537,7 +537,7 @@ struct LambdaResponseStreamWriterHeadersTests { // MARK: - Mock Implementation /// Mock implementation of LambdaResponseStreamWriter for testing -final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter { +final actor MockLambdaResponseStreamWriter: LambdaResponseStreamWriter { private(set) var writtenBuffers: [ByteBuffer] = [] private(set) var isFinished = false private(set) var hasCustomHeaders = false @@ -576,7 +576,7 @@ final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter { // MARK: - Error Handling Mock Implementations /// Mock implementation that fails on specific write calls for testing error propagation -final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter { +final actor FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter { private(set) var writtenBuffers: [ByteBuffer] = [] private(set) var writeCallCount = 0 private(set) var isFinished = false @@ -690,7 +690,7 @@ struct FailingJSONEncoder: LambdaOutputEncoder { // MARK: - Additional Mock Implementations for Integration Tests /// Mock implementation that tracks additional state for integration testing -final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter { +final actor TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter { private(set) var writtenBuffers: [ByteBuffer] = [] private(set) var writeCallCount = 0 private(set) var finishCallCount = 0 @@ -727,7 +727,7 @@ final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter { } /// Mock implementation with custom behavior for integration testing -final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter { +final actor CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter { private(set) var writtenBuffers: [ByteBuffer] = [] private(set) var customBehaviorTriggered = false private(set) var isFinished = false From 7996cca13e3a52c22cb027d41b01dc78db966c87 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 13 Nov 2025 10:05:05 +0100 Subject: [PATCH 3/6] swift format --- .../LambdaResponseStreamWriter+HeadersTests.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift index ec30ed89..1099cdc2 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift @@ -500,7 +500,8 @@ struct LambdaResponseStreamWriterHeadersTests { // Verify events (we know the first two buffers are headers) let eventBuffers = [ - await writer.writtenBuffers[2], await writer.writtenBuffers[3], await writer.writtenBuffers[4], await writer.writtenBuffers[5], + await writer.writtenBuffers[2], await writer.writtenBuffers[3], await writer.writtenBuffers[4], + await writer.writtenBuffers[5], ] let eventStrings = eventBuffers.map { String(buffer: $0) } #expect(eventStrings[0] == "data: Event 1\n\n") From 19c465fa0c04a7c99f3617ba331e7224f18f6eda Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 13 Nov 2025 10:15:20 +0100 Subject: [PATCH 4/6] turn on NonisolatedNonsendingByDefault on Streamable+Codable example --- Examples/Streaming+Codable/Package.swift | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Examples/Streaming+Codable/Package.swift b/Examples/Streaming+Codable/Package.swift index 77e2f45b..ee3d6545 100644 --- a/Examples/Streaming+Codable/Package.swift +++ b/Examples/Streaming+Codable/Package.swift @@ -19,14 +19,16 @@ let package = Package( dependencies: [ .product(name: "AWSLambdaRuntime", package: "swift-aws-lambda-runtime"), .product(name: "AWSLambdaEvents", package: "swift-aws-lambda-events"), - ] + ], + swiftSettings: [ .enableExperimentalFeature("NonisolatedNonsendingByDefault")] ), .testTarget( name: "Streaming+CodableTests", dependencies: [ "StreamingCodable", .product(name: "AWSLambdaRuntime", package: "swift-aws-lambda-runtime"), - ] + ], + swiftSettings: [ .enableExperimentalFeature("NonisolatedNonsendingByDefault")] ), ] ) From 1fb256118f1ae1327a7ed26dc0c5d39f6accc63c Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 13 Nov 2025 10:28:23 +0100 Subject: [PATCH 5/6] swift-format --- Examples/Streaming+Codable/Package.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Examples/Streaming+Codable/Package.swift b/Examples/Streaming+Codable/Package.swift index ee3d6545..6b4d9687 100644 --- a/Examples/Streaming+Codable/Package.swift +++ b/Examples/Streaming+Codable/Package.swift @@ -20,7 +20,7 @@ let package = Package( .product(name: "AWSLambdaRuntime", package: "swift-aws-lambda-runtime"), .product(name: "AWSLambdaEvents", package: "swift-aws-lambda-events"), ], - swiftSettings: [ .enableExperimentalFeature("NonisolatedNonsendingByDefault")] + swiftSettings: [.enableExperimentalFeature("NonisolatedNonsendingByDefault")] ), .testTarget( name: "Streaming+CodableTests", @@ -28,7 +28,7 @@ let package = Package( "StreamingCodable", .product(name: "AWSLambdaRuntime", package: "swift-aws-lambda-runtime"), ], - swiftSettings: [ .enableExperimentalFeature("NonisolatedNonsendingByDefault")] + swiftSettings: [.enableExperimentalFeature("NonisolatedNonsendingByDefault")] ), ] ) From d1252b0b63a31a971cbc0cd304a476684665bfad Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Fri, 14 Nov 2025 10:40:27 +0100 Subject: [PATCH 6/6] fix enableUpcomingFeature --- Examples/Streaming+Codable/Package.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Examples/Streaming+Codable/Package.swift b/Examples/Streaming+Codable/Package.swift index 6b4d9687..91be8bac 100644 --- a/Examples/Streaming+Codable/Package.swift +++ b/Examples/Streaming+Codable/Package.swift @@ -28,7 +28,7 @@ let package = Package( "StreamingCodable", .product(name: "AWSLambdaRuntime", package: "swift-aws-lambda-runtime"), ], - swiftSettings: [.enableExperimentalFeature("NonisolatedNonsendingByDefault")] + swiftSettings: [.enableUpcomingFeature("NonisolatedNonsendingByDefault")] ), ] )