Skip to content

Commit 9df1a12

Browse files
committed
fix: Fix a variety of issues found in DispatchAsync while implementing new tests.
1 parent 33536b0 commit 9df1a12

File tree

4 files changed

+131
-58
lines changed

4 files changed

+131
-58
lines changed

Sources/DispatchAsync/AsyncSemaphore.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
/// Provides a semaphore implantation in `async` context, with a safe wait method. Provides easy safer replacement
15+
/// Provides a semaphore implantation in `async` context, with a safe wait method. Provides easy safe replacement
1616
/// for DispatchSemaphore usage.
1717
@available(macOS 10.15, *)
1818
actor AsyncSemaphore {
@@ -25,6 +25,7 @@ actor AsyncSemaphore {
2525

2626
func wait() async {
2727
value -= 1
28+
2829
if value >= 0 { return }
2930
await withCheckedContinuation {
3031
waiters.append($0)

Sources/DispatchAsync/DispatchGroup.swift

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,34 +23,29 @@
2323
/// for more details,
2424
@available(macOS 10.15, *)
2525
public class DispatchGroup: @unchecked Sendable {
26-
/// Used to ensure FIFO access to the enter and leave calls
27-
@globalActor
28-
private actor DispatchGroupEntryActor: GlobalActor {
29-
static let shared = DispatchGroupEntryActor()
30-
}
31-
32-
private let group = AsyncGroup()
26+
private let group = _AsyncGroup()
27+
private let queue = FIFOQueue()
3328

3429
public func enter() {
35-
Task { @DispatchGroupEntryActor [] in
36-
// ^--- Ensures serial FIFO entrance/exit into the group
30+
queue.enqueue { [weak self] in
31+
guard let self else { return }
3732
await group.enter()
3833
}
3934
}
4035

4136
public func leave() {
42-
Task { @DispatchGroupEntryActor [] in
43-
// ^--- Ensures serial FIFO entrance/exit into the group
37+
queue.enqueue { [weak self] in
38+
guard let self else { return }
4439
await group.leave()
4540
}
4641
}
4742

48-
public func notify(queue: DispatchQueue, execute work: @escaping @Sendable @convention(block) () -> Void) {
49-
Task { @DispatchGroupEntryActor [] in
50-
// ^--- Ensures serial FIFO entrance/exit into the group
43+
public func notify(queue notificationQueue: DispatchQueue, execute work: @escaping @Sendable @convention(block) () -> Void) {
44+
queue.enqueue { [weak self] in
45+
guard let self else { return }
5146
await group.notify {
5247
await withCheckedContinuation { continuation in
53-
queue.async {
48+
notificationQueue.async {
5449
work()
5550
continuation.resume()
5651
}
@@ -60,7 +55,22 @@ public class DispatchGroup: @unchecked Sendable {
6055
}
6156

6257
func wait() async {
63-
await group.wait()
58+
await withCheckedContinuation { continuation in
59+
queue.enqueue { [weak self] in
60+
guard let self else { return }
61+
// NOTE: We use a task for the wait, because
62+
// otherwise the queue won't execute any more
63+
// tasks until the wait finishes, which is not the
64+
// behavior we want here. We want to enqueue the wait
65+
// in FIFO call order, but then we want to allow the wait
66+
// to be non-blocking for the queue until the last leave
67+
// is called on the group.
68+
Task {
69+
await group.wait()
70+
continuation.resume()
71+
}
72+
}
73+
}
6474
}
6575

6676
public init() {}
@@ -69,10 +79,8 @@ public class DispatchGroup: @unchecked Sendable {
6979
// MARK: - Private Interface for Async Usage -
7080

7181
@available(macOS 10.15, *)
72-
fileprivate actor AsyncGroup {
82+
fileprivate actor _AsyncGroup {
7383
private var taskCount = 0
74-
private var continuation: CheckedContinuation<Void, Never>?
75-
private var isWaiting = false
7684
private var notifyHandlers: [@Sendable () async -> Void] = []
7785

7886
func enter() {
@@ -100,30 +108,22 @@ fileprivate actor AsyncGroup {
100108
return
101109
}
102110

103-
isWaiting = true
104-
105111
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
106-
self.continuation = continuation
112+
notify {
113+
continuation.resume()
114+
}
107115
checkCompletion()
108116
}
109117
}
110118

111119
private func checkCompletion() {
112-
if taskCount <= 0 {
113-
if isWaiting {
114-
continuation?.resume()
115-
continuation = nil
116-
isWaiting = false
117-
}
120+
if taskCount <= 0, !notifyHandlers.isEmpty {
121+
let handlers = notifyHandlers
122+
notifyHandlers.removeAll()
118123

119-
if !notifyHandlers.isEmpty {
120-
let handlers = notifyHandlers
121-
notifyHandlers.removeAll()
122-
123-
for handler in handlers {
124-
Task {
125-
await handler()
126-
}
124+
for handler in handlers {
125+
Task {
126+
await handler()
127127
}
128128
}
129129
}

Sources/DispatchAsync/DispatchQueue.swift

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,26 @@
2323
public class DispatchQueue: @unchecked Sendable {
2424
public static let main = DispatchQueue(isMain: true)
2525

26-
private static let _global = DispatchQueue()
26+
private static let _global = DispatchQueue(attributes: .concurrent)
2727
public static func global() -> DispatchQueue {
2828
Self._global
2929
}
3030

3131
public enum Attributes {
3232
case concurrent
33+
34+
fileprivate var isConcurrent: Bool {
35+
switch self {
36+
case .concurrent:
37+
return true
38+
}
39+
}
3340
}
3441

3542
private let targetQueue: DispatchQueue?
3643

44+
private let serialQueue = FIFOQueue()
45+
3746
/// Indicates whether calling context is running from the main DispatchQueue instance, or some other DispatchQueue instance.
3847
@TaskLocal public static var isMain = false
3948

@@ -56,6 +65,10 @@ public class DispatchQueue: @unchecked Sendable {
5665
attributes: DispatchQueue.Attributes? = nil,
5766
target: DispatchQueue? = nil
5867
) {
68+
if isMain, attributes == .concurrent {
69+
assertionFailure("Should never create a concurrent main queue. Main queue should always be serial.")
70+
}
71+
5972
self.isMain = isMain
6073
self.label = label
6174
self.attributes = attributes
@@ -77,10 +90,60 @@ public class DispatchQueue: @unchecked Sendable {
7790
}
7891
}
7992
} else {
80-
Task {
81-
work()
93+
if attributes?.isConcurrent == true {
94+
Task { // FIFO is not important for concurrent queues, using global task executor here
95+
work()
96+
}
97+
} else {
98+
// We don't need to use a task for enqueing work to a non-main serial queue
99+
// because the enqueue process is very light-weight, and it is important to
100+
// preserve FIFO entry into the queue as much as possible.
101+
serialQueue.enqueue(work)
82102
}
83103
}
84104
}
85105
}
86106
}
107+
108+
/// A tiny FIFO job runner that executes each submitted async closure
109+
/// strictly in the order it was enqueued.
110+
///
111+
/// This is NOT part of the original GCD API. So it is intentionally kept
112+
/// internal for now.
113+
@available(macOS 10.15, *)
114+
actor FIFOQueue {
115+
/// A single item in the stream, which is a block of work that can be completed.
116+
typealias WorkItem = @Sendable () async -> Void
117+
118+
/// The stream’s continuation; lives inside the actor so nobody
119+
/// else can yield into it.
120+
private let continuation: AsyncStream<WorkItem>.Continuation
121+
122+
/// Spin up the stream and the single draining task.
123+
init(bufferingPolicy: AsyncStream<WorkItem>.Continuation.BufferingPolicy = .unbounded) {
124+
let stream: AsyncStream<WorkItem>
125+
(stream, self.continuation) = AsyncStream.makeStream(of: WorkItem.self, bufferingPolicy: bufferingPolicy)
126+
127+
// Dedicated worker that processes work items one-by-one.
128+
Task {
129+
for await work in stream {
130+
// Run each job in order, allowing suspension, and awaiting full
131+
// completion, before running the next work item
132+
await work()
133+
}
134+
}
135+
}
136+
137+
/// Enqueue a new unit of work.
138+
@discardableResult
139+
nonisolated
140+
func enqueue(_ workItem: @escaping WorkItem) -> AsyncStream<WorkItem>.Continuation.YieldResult {
141+
// Never suspends, preserves order
142+
continuation.yield(workItem)
143+
}
144+
145+
deinit {
146+
// Clean shutdown on deinit
147+
continuation.finish()
148+
}
149+
}

Sources/DispatchAsync/DispatchTime.swift

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,39 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
@available(macOS 13, *)
16-
public typealias DispatchTime = ContinuousClock.Instant
16+
public struct DispatchTime {
17+
private let instant: ContinuousClock.Instant
1718

18-
/// The very first time someone tries to reference a `uptimeNanoseconds` or a similar
19-
/// function that references a beginning point, this variable will be initialized as a beginning
20-
/// reference point. This guarantees that all calls to `uptimeNanoseconds` or similar
21-
/// will be 0 or greater.
22-
///
23-
/// By design, it is not possible to related `ContinuousClock.Instant` to
24-
/// `ProcessInfo.processInfo.systemUptime`, and even if one devised such
25-
/// a mechanism, it would open the door for fingerprinting. It's best to let the concept
26-
/// of uptime be relative to previous uptime calls.
27-
@available(macOS 13, *)
28-
private let uptimeBeginning: DispatchTime = DispatchTime.now()
19+
/// The very first time someone intializes a DispatchTime instance, we
20+
/// reference this static let, causing it to be initialized.
21+
///
22+
/// This is the closest we can get to snapshotting the start time of the running
23+
/// executable, without using OS-specific calls. We want
24+
/// to avoid OS-specific calls to maximize portability.
25+
///
26+
/// To keep this robust, we initialize `self.durationSinceBeginning`
27+
/// to this value using a default value, which is guaranteed to run before any
28+
/// initializers run. This guarantees that uptimeBeginning will be the very
29+
/// first
30+
@available(macOS 13, *)
31+
private static let uptimeBeginning: ContinuousClock.Instant = ContinuousClock.Instant.now
32+
33+
/// See documentation for ``uptimeBeginning``. We intentionally
34+
/// use this to guarantee a capture of `now` to uptimeBeginnin BEFORE
35+
/// any DispatchTime instances are initialized.
36+
private let durationSinceUptime = uptimeBeginning.duration(to: ContinuousClock.Instant.now)
37+
38+
public init() {
39+
self.instant = ContinuousClock.Instant.now
40+
}
2941

30-
@available(macOS 13, *)
31-
extension DispatchTime {
3242
public static func now() -> DispatchTime {
33-
now
43+
DispatchTime()
3444
}
3545

3646
public var uptimeNanoseconds: UInt64 {
37-
let beginning = uptimeBeginning
38-
let rightNow = DispatchTime.now()
39-
let uptimeDuration: Int64 = beginning.duration(to: rightNow).nanosecondsClamped
47+
let beginning = DispatchTime.uptimeBeginning
48+
let uptimeDuration: Int64 = beginning.duration(to: self.instant).nanosecondsClamped
4049
guard uptimeDuration >= 0 else {
4150
assertionFailure("It shouldn't be possible to get a negative duration since uptimeBeginning.")
4251
return 0

0 commit comments

Comments
 (0)