Skip to content

Commit 2cc8fb2

Browse files
authored
Merge pull request #3 from uber/optimize-countdownlatch-master
Optimize CountDownLatch
2 parents 70e9f59 + 7ec8343 commit 2cc8fb2

File tree

1 file changed

+46
-61
lines changed

1 file changed

+46
-61
lines changed

Sources/Concurrency/CountDownLatch.swift

Lines changed: 46 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ import Foundation
1818

1919
/// A concurrency utility class that allows coordination between threads. A count down latch
2020
/// starts with an initial count. Threads can then decrement the count until it reaches zero,
21-
/// at which point, the suspended waiting thread shall proceed.
21+
/// at which point, the suspended waiting thread shall proceed. A `CountDownLatch` behaves
22+
/// differently from a `DispatchSemaphore` once the latch is open. Unlike a semaphore where
23+
/// subsequent waits would still block the caller thread, once a `CountDownLatch` is open, all
24+
/// subsequent waits can directly passthrough.
2225
public class CountDownLatch {
2326

2427
/// The initial count of the latch.
@@ -29,86 +32,68 @@ public class CountDownLatch {
2932
/// - parameter count: The initial count for the latch.
3033
public init(count: Int) {
3134
assert(count > 0, "CountDownLatch must have an initial count that is greater than 0.")
32-
3335
initialCount = count
34-
countDownValue = count
35-
waitingCount = 0
36+
conditionCount = AtomicInt(initialValue: count)
3637
}
3738

3839
/// Decrements the latch's count, resuming all awaiting threads if the count reaches zero.
40+
///
41+
/// - note: If the latch is already open, invoking this method has no effects.
3942
public func countDown() {
40-
// Use the serial queue to read and write to both the count variables. This allows us to
41-
// ensure thread-safe access, while allowing this method to be invoked without blocking
42-
// or any contension. We cannot use atomic integers to replace the queue, since both of
43-
// count variables need to be updated atomically together, to avoid waiting on the latch
44-
// after the semaphore is signaled.
45-
queue.async {
46-
guard self.countDownValue > 0 else {
47-
return
48-
}
49-
50-
self.countDownValue -= 1
43+
// Use `AtomicInt` to avoid contention during counting down and waiting. This allows the
44+
// lock to be only acquired at the time when the latch switches from closed to open.
45+
guard conditionCount.value > 0 else {
46+
return
47+
}
5148

52-
if self.countDownValue == 0 {
53-
// Wake up all waiting invocations, not just threads. We cannot rely on the returned
54-
// value from dispatch_semaphore_signal since it returns true as long as any thread
55-
// is woken, momentarily. When the same thread invokes await multiple times, semaphore
56-
// signal method returns true even if it only unblocks the first await.
57-
while self.waitingCount > 0 {
58-
self.semaphore.signal()
59-
self.waitingCount -= 1
60-
}
61-
}
49+
let newValue = conditionCount.decrementAndGet()
50+
// Check for <= since multiple threads can perform decrements concurrently.
51+
if newValue <= 0 {
52+
condition.lock()
53+
condition.broadcast()
54+
condition.unlock()
6255
}
6356
}
6457

6558
/// Causes the current thread to suspend until the latch counts down to zero.
6659
///
67-
/// - note: If the current count is already zero, this method returns immediately without suspending the current
68-
/// thread.
60+
/// - note: If the current count is already zero, this method returns immediately without
61+
/// suspending the current thread.
6962
///
70-
/// - parameter timeout: The optional timeout value in seconds. If the latch is not counted down to zero before the
71-
/// timeout, this method returns false. If not defined, the current thread will wait forever until the latch is
72-
/// counted down to zero.
73-
/// - returns: true if the latch is counted down to zero. false if the timeout occurred before the latch reaches
74-
/// zero.
63+
/// - parameter timeout: The optional timeout value in seconds. If the latch is not counted
64+
/// down to zero before the timeout, this method returns false. If not defined, the current
65+
/// thread will wait forever until the latch is counted down to zero.
66+
/// - returns: true if the latch is counted down to zero. false if the timeout occurred before
67+
/// the latch reaches zero.
7568
@discardableResult
7669
public func await(timeout: TimeInterval? = nil) -> Bool {
77-
// Only use the queue to access counts but not the semaphore wait, since we need to ensure
78-
// counts are always accessed from the queue's thread. Do not wait on the semaphore inside
79-
// the queue, since the queue is serial, blocking the queue results in deadlock, since the
80-
// semaphore signal will also be blocked.
81-
let alreadyOpen: Bool = queue.sync {
82-
let alreadyOpen = self.countDownValue <= 0
83-
if !alreadyOpen {
84-
self.waitingCount += 1
85-
}
86-
return alreadyOpen
70+
// Use `AtomicInt` to avoid contention during counting down and waiting. This allows the
71+
// lock to be only acquired at the time when the latch switches from closed to open.
72+
guard conditionCount.value > 0 else {
73+
return true
8774
}
8875

89-
if alreadyOpen {
90-
return true
76+
let deadline: Date
77+
if let timeout = timeout {
78+
deadline = Date().addingTimeInterval(timeout)
9179
} else {
92-
let deadline: DispatchTime
93-
if let timeout = timeout {
94-
deadline = DispatchTime.now() + DispatchTimeInterval.milliseconds(Int(timeout * 1000))
95-
} else {
96-
deadline = .distantFuture
97-
}
98-
return self.semaphore.wait(timeout: deadline) == .success
80+
deadline = Date.distantFuture
81+
}
82+
83+
condition.lock()
84+
// Check count again after acquiring the lock, before entering waiting. This ensures the caller
85+
// does not enter waiting after the last counting down occurs.
86+
if conditionCount.value > 0 {
87+
return condition.wait(until: deadline)
9988
}
89+
condition.unlock()
90+
return true
10091
}
10192

10293
// MARK: - Private
10394

104-
private let semaphore = DispatchSemaphore(value: 0)
105-
// Use the serial queue to read and write to both the count variables. This allows us to
106-
// ensure thread-safe access, while allowing this method to be invoked without blocking
107-
// or any contension. We cannot use atomic integers to replace the queue, since both of
108-
// count variables need to be updated atomically together, to avoid waiting on the latch
109-
// after the semaphore is signaled.
110-
private let queue = DispatchQueue(label: "CountDownLatch.executeQueue", qos: .userInteractive)
111-
112-
private var countDownValue: Int
113-
private var waitingCount: Int
95+
private let condition = NSCondition()
96+
// Use `AtomicInt` to avoid contention during counting down and waiting. This allows the
97+
// lock to be only acquired at the time when the latch switches from closed to open.
98+
private let conditionCount: AtomicInt
11499
}

0 commit comments

Comments
 (0)