From c80bbbcf234360f4e31dc312a8dca206356b995b Mon Sep 17 00:00:00 2001 From: Scott Marchant Date: Fri, 14 Nov 2025 14:26:41 -0700 Subject: [PATCH] test: Add tests for MultiThreadedEventLoopGroup in NIOAsyncRuntime. The vast majority of these tests were ported from NIOPosix tests for its own MultiThreadedEventLoopGroup. These tests ensure basic feature parity between the two different implementations. --- .../MultiThreadedEventLoopGroupTests.swift | 209 ++++++++++++++++++ 1 file changed, 209 insertions(+) create mode 100644 Tests/NIOAsyncRuntimeTests/MultiThreadedEventLoopGroupTests.swift diff --git a/Tests/NIOAsyncRuntimeTests/MultiThreadedEventLoopGroupTests.swift b/Tests/NIOAsyncRuntimeTests/MultiThreadedEventLoopGroupTests.swift new file mode 100644 index 0000000..524c89b --- /dev/null +++ b/Tests/NIOAsyncRuntimeTests/MultiThreadedEventLoopGroupTests.swift @@ -0,0 +1,209 @@ +//===----------------------------------------------------------------------===// +// +// Copyright (c) 2025 PassiveLogic, Inc. +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOConcurrencyHelpers +import Testing + +@testable import NIOAsyncRuntime +@testable import NIOCore + +// NOTE: These tests are copied and adapted from NIOPosixTests.EventLoopTest +// They have been modified to use async running, among other things. + +@Suite("MultiThreadedEventLoopGroupTests", .serialized, .timeLimit(.minutes(1))) +final class MultiThreadedEventLoopGroupTests { + @Test + func testLotsOfMixedImmediateAndScheduledTasks() async throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + #expect(throws: Never.self) { + try group.syncShutdownGracefully() + } + } + + let eventLoop = group.next() + struct Counter: Sendable { + private var _submitCount = NIOLockedValueBox(0) + var submitCount: Int { + get { self._submitCount.withLockedValue { $0 } } + nonmutating set { self._submitCount.withLockedValue { $0 = newValue } } + } + private var _scheduleCount = NIOLockedValueBox(0) + var scheduleCount: Int { + get { self._scheduleCount.withLockedValue { $0 } } + nonmutating set { self._scheduleCount.withLockedValue { $0 = newValue } } + } + } + + let achieved = Counter() + var immediateTasks = [EventLoopFuture]() + var scheduledTasks = [Scheduled]() + for _ in (0..<100_000) { + if Bool.random() { + let task = eventLoop.submit { + achieved.submitCount += 1 + } + immediateTasks.append(task) + } + if Bool.random() { + let task = eventLoop.scheduleTask(in: .microseconds(10)) { + achieved.scheduleCount += 1 + } + scheduledTasks.append(task) + } + } + + let submitCount = try await EventLoopFuture.whenAllSucceed(immediateTasks, on: eventLoop).map({ + _ in + achieved.submitCount + }).get() + #expect(submitCount == achieved.submitCount) + + let scheduleCount = try await EventLoopFuture.whenAllSucceed( + scheduledTasks.map { $0.futureResult }, + on: eventLoop + ) + .map({ _ in + achieved.scheduleCount + }).get() + #expect(scheduleCount == scheduledTasks.count) + } + + @Test + func testLotsOfMixedImmediateAndScheduledTasksFromEventLoop() async throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + #expect(throws: Never.self) { + try group.syncShutdownGracefully() + } + } + + let eventLoop = group.next() + struct Counter: Sendable { + private var _submitCount = NIOLockedValueBox(0) + var submitCount: Int { + get { self._submitCount.withLockedValue { $0 } } + nonmutating set { self._submitCount.withLockedValue { $0 = newValue } } + } + private var _scheduleCount = NIOLockedValueBox(0) + var scheduleCount: Int { + get { self._scheduleCount.withLockedValue { $0 } } + nonmutating set { self._scheduleCount.withLockedValue { $0 = newValue } } + } + } + + let achieved = Counter() + let (immediateTasks, scheduledTasks) = try await eventLoop.submit { + var immediateTasks = [EventLoopFuture]() + var scheduledTasks = [Scheduled]() + for _ in (0..<100_000) { + if Bool.random() { + let task = eventLoop.submit { + achieved.submitCount += 1 + } + immediateTasks.append(task) + } + if Bool.random() { + let task = eventLoop.scheduleTask(in: .microseconds(10)) { + achieved.scheduleCount += 1 + } + scheduledTasks.append(task) + } + } + return (immediateTasks, scheduledTasks) + }.get() + + let submitCount = try await EventLoopFuture.whenAllSucceed(immediateTasks, on: eventLoop) + .map({ _ in + achieved.submitCount + }).get() + #expect(submitCount == achieved.submitCount) + + let scheduleCount = try await EventLoopFuture.whenAllSucceed( + scheduledTasks.map { $0.futureResult }, + on: eventLoop + ) + .map({ _ in + achieved.scheduleCount + }).get() + #expect(scheduleCount == scheduledTasks.count) + } + + @Test + func testImmediateTasksDontGetStuck() async throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + #expect(throws: Never.self) { + try group.syncShutdownGracefully() + } + } + + let eventLoop = group.next() + let testEventLoop = MultiThreadedEventLoopGroup.singleton.any() + + let longWait = TimeAmount.seconds(60) + let failDeadline = NIODeadline.now() + longWait + let (immediateTasks, scheduledTask) = try await eventLoop.submit { + // Submit over the 4096 immediate tasks, and some scheduled tasks + // with expiry deadline in (nearish) future. + // We want to make sure immediate tasks, even those that don't fit + // in the first batch, don't get stuck waiting for scheduled task + // expiry + let immediateTasks = (0..<5000).map { _ in + eventLoop.submit {}.hop(to: testEventLoop) + } + let scheduledTask = eventLoop.scheduleTask(in: longWait) { + } + + return (immediateTasks, scheduledTask) + }.get() + + // The immediate tasks should all succeed ~immediately. + // We're testing for a case where the EventLoop gets confused + // into waiting for the scheduled task expiry to complete + // some immediate tasks. + _ = try await EventLoopFuture.whenAllSucceed(immediateTasks, on: testEventLoop).get() + #expect(.now() < failDeadline) + + scheduledTask.cancel() + } + + @Test + func testInEventLoopABAProblem() async throws { + // Older SwiftNIO versions had a bug here, they held onto `pthread_t`s for ever (which is illegal) and then + // used `pthread_equal(pthread_self(), myPthread)`. `pthread_equal` just compares the pointer values which + // means there's an ABA problem here. This test checks that we don't suffer from that issue now. + let allELs: NIOLockedValueBox<[any EventLoop]> = NIOLockedValueBox([]) + + for _ in 0..<100 { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 4) + defer { + #expect(throws: Never.self) { + try group.syncShutdownGracefully() + } + } + for loop in group.makeIterator() { + try! await loop.submit { + allELs.withLockedValue { allELs in + #expect(loop.inEventLoop) + for otherEL in allELs { + #expect( + !otherEL.inEventLoop, + "should only be in \(loop) but turns out also in \(otherEL)" + ) + } + allELs.append(loop) + } + }.get() + } + } + } +}