From 70568bfc0f3a1e2935e375aaf6978c38cd5ef108 Mon Sep 17 00:00:00 2001 From: Jay Herron Date: Wed, 15 Apr 2026 00:15:40 -0600 Subject: [PATCH] feat: Adds `serial` batch execution option to AsyncDataLoader --- Sources/AsyncDataLoader/DataLoader.swift | 12 ++++- .../AsyncDataLoader/DataLoaderOptions.swift | 31 +++++++++++++ .../DataLoaderTests.swift | 46 +++++++++++++++++++ 3 files changed, 87 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncDataLoader/DataLoader.swift b/Sources/AsyncDataLoader/DataLoader.swift index cd84750..c29bd3e 100644 --- a/Sources/AsyncDataLoader/DataLoader.swift +++ b/Sources/AsyncDataLoader/DataLoader.swift @@ -175,8 +175,16 @@ public actor DataLoader { // If a maxBatchSize was provided and the queue is longer, then segment the // queue into multiple batches, otherwise treat the queue as a single batch. if let maxBatchSize = options.maxBatchSize, maxBatchSize > 0, maxBatchSize < batch.count { - try await batch.chunks(ofCount: maxBatchSize).asyncForEach { slicedBatch in - try await self.executeBatch(batch: Array(slicedBatch)) + let chunks = batch.chunks(ofCount: maxBatchSize) + switch options.executionStrategy.option { + case .parallel: + try await chunks.asyncForEach { slicedBatch in + try await self.executeBatch(batch: Array(slicedBatch)) + } + case .serial: + for chunk in chunks { + try await self.executeBatch(batch: Array(chunk)) + } } } else { try await executeBatch(batch: batch) diff --git a/Sources/AsyncDataLoader/DataLoaderOptions.swift b/Sources/AsyncDataLoader/DataLoaderOptions.swift index bab2c56..fb58c2e 100644 --- a/Sources/AsyncDataLoader/DataLoaderOptions.swift +++ b/Sources/AsyncDataLoader/DataLoaderOptions.swift @@ -20,6 +20,11 @@ public struct DataLoaderOptions: Sendable { /// This is irrelevant if batching is disabled. public let executionPeriod: UInt64? + /// Default `parallel`. Defines the strategy for execution when + /// the execution queue exceeds `maxBatchSize`. + /// This is irrelevant if batching is disabled. + public let executionStrategy: ExecutionStrategy + /// Default `nil`. Produces cache key for a given load key. Useful /// when objects are keys and two objects should be considered equivalent. public let cacheKeyFunction: (@Sendable (Key) -> Key)? @@ -29,12 +34,38 @@ public struct DataLoaderOptions: Sendable { cachingEnabled: Bool = true, maxBatchSize: Int? = nil, executionPeriod: UInt64? = 2_000_000, + executionStrategy: ExecutionStrategy = .parallel, cacheKeyFunction: (@Sendable (Key) -> Key)? = nil ) { self.batchingEnabled = batchingEnabled self.cachingEnabled = cachingEnabled self.executionPeriod = executionPeriod + self.executionStrategy = executionStrategy self.maxBatchSize = maxBatchSize self.cacheKeyFunction = cacheKeyFunction } + + /// The strategy for execution when the execution queue exceeds `maxBatchSize`. + public struct ExecutionStrategy: Sendable { + let option: Option + + private init(option: Option) { + self.option = option + } + + /// Batches within a single execution will be executed simultaneously + public static var parallel: Self { + .init(option: .parallel) + } + + /// Batches within a single execution will be executed one-at-a-time + public static var serial: Self { + .init(option: .serial) + } + + enum Option { + case parallel + case serial + } + } } diff --git a/Tests/AsyncDataLoaderTests/DataLoaderTests.swift b/Tests/AsyncDataLoaderTests/DataLoaderTests.swift index 64e13d1..cbfbd3b 100644 --- a/Tests/AsyncDataLoaderTests/DataLoaderTests.swift +++ b/Tests/AsyncDataLoaderTests/DataLoaderTests.swift @@ -140,6 +140,52 @@ final class DataLoaderTests: XCTestCase { XCTAssertEqual(calls.last?.count, 1) } + func testSerialExecution() async throws { + let loadCalls = Concurrent<[[Int]]>([]) + + let identityLoader = DataLoader( + options: DataLoaderOptions( + batchingEnabled: true, + maxBatchSize: 2, + executionPeriod: nil, + executionStrategy: .serial + ) + ) { keys in + await loadCalls.mutating { $0.append(keys) } + + return keys.map { DataLoaderValue.success($0) } + } + + async let value1 = identityLoader.load(key: 1) + async let value2 = identityLoader.load(key: 2) + async let value3 = identityLoader.load(key: 3) + + try await Task.sleep(nanoseconds: sleepConstant) + + var didFailWithError: Error? + + do { + _ = try await identityLoader.execute() + } catch { + didFailWithError = error + } + + XCTAssertNil(didFailWithError) + + let result1 = try await value1 + let result2 = try await value2 + let result3 = try await value3 + + XCTAssertEqual(result1, 1) + XCTAssertEqual(result2, 2) + XCTAssertEqual(result3, 3) + + let calls = await loadCalls.wrappedValue + + XCTAssertEqual(calls.first?.count, 2) + XCTAssertEqual(calls.last?.count, 1) + } + /// Coalesces identical requests func testCoalescesIdenticalRequests() async throws { let loadCalls = Concurrent<[[Int]]>([])