Skip to content

Commit 72a692f

Browse files
committed
test: Add tests for NIOThreadPool
1 parent 0906089 commit 72a692f

File tree

1 file changed

+166
-0
lines changed

1 file changed

+166
-0
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2020-2024 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Atomics
16+
import Dispatch
17+
import NIOConcurrencyHelpers
18+
import NIOCore
19+
import Testing
20+
21+
@testable import NIOAsyncRuntime
22+
23+
@Suite("NIOThreadPoolTest", .timeLimit(.minutes(1)))
24+
class NIOThreadPoolTest {
25+
private func makeEventLoop() -> AsyncEventLoop {
26+
AsyncEventLoop(manualTimeModeForTesting: true)
27+
}
28+
29+
@Test
30+
func testThreadPoolStartsMultipleTimes() throws {
31+
let numberOfThreads = 1
32+
let pool = NIOThreadPool(numberOfThreads: numberOfThreads)
33+
pool.start()
34+
defer {
35+
#expect(throws: Never.self) { pool.shutdownGracefully() }
36+
}
37+
38+
let completionGroup = DispatchGroup()
39+
40+
// The lock here is arguably redundant with the dispatchgroup, but let's make
41+
// this test thread-safe even something goes wrong
42+
let threadOne: NIOLockedValueBox<UInt?> = NIOLockedValueBox(UInt?.none)
43+
let threadTwo: NIOLockedValueBox<UInt?> = NIOLockedValueBox(UInt?.none)
44+
45+
let expectedValue: UInt = 1
46+
47+
completionGroup.enter()
48+
pool.submit { s in
49+
precondition(s == .active)
50+
threadOne.withLockedValue { threadOne in
51+
#expect(threadOne == nil)
52+
threadOne = expectedValue
53+
}
54+
completionGroup.leave()
55+
}
56+
57+
// Now start the thread pool again. This must not destroy existing threads, so our thread should be the same.
58+
pool.start()
59+
completionGroup.enter()
60+
pool.submit { s in
61+
precondition(s == .active)
62+
threadTwo.withLockedValue { threadTwo in
63+
#expect(threadTwo == nil)
64+
threadTwo = expectedValue
65+
}
66+
completionGroup.leave()
67+
}
68+
69+
completionGroup.wait()
70+
71+
#expect(threadOne.withLockedValue { $0 } != nil)
72+
#expect(threadTwo.withLockedValue { $0 } != nil)
73+
#expect(threadOne.withLockedValue { $0 } == threadTwo.withLockedValue { $0 })
74+
}
75+
76+
@Test
77+
func testAsyncThreadPool() async throws {
78+
let numberOfThreads = 1
79+
let pool = NIOThreadPool(numberOfThreads: numberOfThreads)
80+
pool.start()
81+
do {
82+
let hitCount = ManagedAtomic(false)
83+
try await pool.runIfActive {
84+
hitCount.store(true, ordering: .relaxed)
85+
}
86+
#expect(hitCount.load(ordering: .relaxed) == true)
87+
} catch {}
88+
try await pool.shutdownGracefully()
89+
}
90+
91+
@Test
92+
func testAsyncThreadPoolErrorPropagation() async throws {
93+
struct ThreadPoolError: Error {}
94+
let numberOfThreads = 1
95+
let pool = NIOThreadPool(numberOfThreads: numberOfThreads)
96+
pool.start()
97+
do {
98+
try await pool.runIfActive {
99+
throw ThreadPoolError()
100+
}
101+
Issue.record("Should not get here as closure sent to runIfActive threw an error")
102+
} catch {
103+
#expect(error as? ThreadPoolError != nil, "Error thrown should be of type ThreadPoolError")
104+
}
105+
try await pool.shutdownGracefully()
106+
}
107+
108+
@Test
109+
func testAsyncThreadPoolNotActiveError() async throws {
110+
struct ThreadPoolError: Error {}
111+
let numberOfThreads = 1
112+
let pool = NIOThreadPool(numberOfThreads: numberOfThreads)
113+
do {
114+
try await pool.runIfActive {
115+
throw ThreadPoolError()
116+
}
117+
Issue.record("Should not get here as thread pool isn't active")
118+
} catch {
119+
#expect(
120+
error as? CancellationError != nil, "Error thrown should be of type CancellationError")
121+
}
122+
try await pool.shutdownGracefully()
123+
}
124+
125+
@Test
126+
func testAsyncThreadPoolCancellation() async throws {
127+
let pool = NIOThreadPool(numberOfThreads: 1)
128+
pool.start()
129+
130+
await withThrowingTaskGroup(of: Issue.self) { group in
131+
group.cancelAll()
132+
group.addTask {
133+
try await pool.runIfActive {
134+
Issue.record("Should be cancelled before executed")
135+
}
136+
}
137+
138+
do {
139+
try await group.waitForAll()
140+
Issue.record("Expected CancellationError to be thrown")
141+
} catch {
142+
#expect(error is CancellationError)
143+
}
144+
}
145+
146+
try await pool.shutdownGracefully()
147+
}
148+
149+
@Test
150+
func testAsyncShutdownWorks() async throws {
151+
let threadPool = NIOThreadPool(numberOfThreads: 17)
152+
let eventLoop = makeEventLoop()
153+
154+
threadPool.start()
155+
try await threadPool.shutdownGracefully()
156+
157+
let future: EventLoopFuture = threadPool.runIfActive(eventLoop: eventLoop) {
158+
Issue.record("This shouldn't run because the pool is shutdown.")
159+
}
160+
161+
await #expect(throws: (any Error).self) {
162+
try await future.get()
163+
}
164+
165+
}
166+
}

0 commit comments

Comments
 (0)