Skip to content

Commit 5ce5048

Browse files
author
Rudro Samanta
authored
Merge pull request #12 from uber/execution-master
Add task execution package
2 parents 5cc8829 + d9648a8 commit 5ce5048

File tree

8 files changed

+540
-100
lines changed

8 files changed

+540
-100
lines changed

Concurrency.xcodeproj/project.pbxproj

Lines changed: 36 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,16 @@
66
objectVersion = 46;
77
objects = {
88

9-
/* Begin PBXAggregateTarget section */
10-
"Concurrency::ConcurrencyPackageTests::ProductTarget" /* ConcurrencyPackageTests */ = {
11-
isa = PBXAggregateTarget;
12-
buildConfigurationList = OBJ_39 /* Build configuration list for PBXAggregateTarget "ConcurrencyPackageTests" */;
13-
buildPhases = (
14-
);
15-
dependencies = (
16-
OBJ_42 /* PBXTargetDependency */,
17-
);
18-
name = ConcurrencyPackageTests;
19-
productName = ConcurrencyPackageTests;
20-
};
21-
/* End PBXAggregateTarget section */
22-
239
/* Begin PBXBuildFile section */
10+
41B94843210A4744007E59C8 /* SerialSequenceExecutor.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B9483F210A4744007E59C8 /* SerialSequenceExecutor.swift */; };
11+
41B94844210A4744007E59C8 /* SequenceExecutor.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94840210A4744007E59C8 /* SequenceExecutor.swift */; };
12+
41B94845210A4744007E59C8 /* ConcurrentSequenceExecutor.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94841210A4744007E59C8 /* ConcurrentSequenceExecutor.swift */; };
13+
41B94846210A4744007E59C8 /* Task.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94842210A4744007E59C8 /* Task.swift */; };
14+
41B94849210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94848210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift */; };
2415
OBJ_27 /* AtomicBool.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_9 /* AtomicBool.swift */; };
2516
OBJ_28 /* AtomicInt.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_10 /* AtomicInt.swift */; };
2617
OBJ_29 /* AtomicReference.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_11 /* AtomicReference.swift */; };
2718
OBJ_30 /* CountDownLatch.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_12 /* CountDownLatch.swift */; };
28-
OBJ_37 /* Package.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_6 /* Package.swift */; };
2919
OBJ_48 /* AtomicBoolTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_15 /* AtomicBoolTests.swift */; };
3020
OBJ_49 /* AtomicIntTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_16 /* AtomicIntTests.swift */; };
3121
OBJ_50 /* AtomicReferenceTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_17 /* AtomicReferenceTests.swift */; };
@@ -41,16 +31,14 @@
4131
remoteGlobalIDString = "Concurrency::Concurrency";
4232
remoteInfo = Concurrency;
4333
};
44-
412CDD2E20B88EAB00AF5890 /* PBXContainerItemProxy */ = {
45-
isa = PBXContainerItemProxy;
46-
containerPortal = OBJ_1 /* Project object */;
47-
proxyType = 1;
48-
remoteGlobalIDString = "Concurrency::ConcurrencyTests";
49-
remoteInfo = ConcurrencyTests;
50-
};
5134
/* End PBXContainerItemProxy section */
5235

5336
/* Begin PBXFileReference section */
37+
41B9483F210A4744007E59C8 /* SerialSequenceExecutor.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SerialSequenceExecutor.swift; sourceTree = "<group>"; };
38+
41B94840210A4744007E59C8 /* SequenceExecutor.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SequenceExecutor.swift; sourceTree = "<group>"; };
39+
41B94841210A4744007E59C8 /* ConcurrentSequenceExecutor.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ConcurrentSequenceExecutor.swift; sourceTree = "<group>"; };
40+
41B94842210A4744007E59C8 /* Task.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Task.swift; sourceTree = "<group>"; };
41+
41B94848210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ConcurrentSequenceExecutorTests.swift; sourceTree = "<group>"; };
5442
"Concurrency::Concurrency::Product" /* Concurrency.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; path = Concurrency.framework; sourceTree = BUILT_PRODUCTS_DIR; };
5543
"Concurrency::ConcurrencyTests::Product" /* ConcurrencyTests.xctest */ = {isa = PBXFileReference; lastKnownFileType = file; path = ConcurrencyTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
5644
OBJ_10 /* AtomicInt.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AtomicInt.swift; sourceTree = "<group>"; };
@@ -83,6 +71,25 @@
8371
/* End PBXFrameworksBuildPhase section */
8472

8573
/* Begin PBXGroup section */
74+
41B9483E210A4744007E59C8 /* Executor */ = {
75+
isa = PBXGroup;
76+
children = (
77+
41B9483F210A4744007E59C8 /* SerialSequenceExecutor.swift */,
78+
41B94840210A4744007E59C8 /* SequenceExecutor.swift */,
79+
41B94841210A4744007E59C8 /* ConcurrentSequenceExecutor.swift */,
80+
41B94842210A4744007E59C8 /* Task.swift */,
81+
);
82+
path = Executor;
83+
sourceTree = "<group>";
84+
};
85+
41B94847210A4756007E59C8 /* Executor */ = {
86+
isa = PBXGroup;
87+
children = (
88+
41B94848210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift */,
89+
);
90+
path = Executor;
91+
sourceTree = "<group>";
92+
};
8693
OBJ_13 /* Tests */ = {
8794
isa = PBXGroup;
8895
children = (
@@ -94,6 +101,7 @@
94101
OBJ_14 /* ConcurrencyTests */ = {
95102
isa = PBXGroup;
96103
children = (
104+
41B94847210A4756007E59C8 /* Executor */,
97105
OBJ_15 /* AtomicBoolTests.swift */,
98106
OBJ_16 /* AtomicIntTests.swift */,
99107
OBJ_17 /* AtomicReferenceTests.swift */,
@@ -133,6 +141,7 @@
133141
OBJ_8 /* Concurrency */ = {
134142
isa = PBXGroup;
135143
children = (
144+
41B9483E210A4744007E59C8 /* Executor */,
136145
OBJ_9 /* AtomicBool.swift */,
137146
OBJ_10 /* AtomicInt.swift */,
138147
OBJ_11 /* AtomicReference.swift */,
@@ -178,20 +187,6 @@
178187
productReference = "Concurrency::ConcurrencyTests::Product" /* ConcurrencyTests.xctest */;
179188
productType = "com.apple.product-type.bundle.unit-test";
180189
};
181-
"Concurrency::SwiftPMPackageDescription" /* ConcurrencyPackageDescription */ = {
182-
isa = PBXNativeTarget;
183-
buildConfigurationList = OBJ_33 /* Build configuration list for PBXNativeTarget "ConcurrencyPackageDescription" */;
184-
buildPhases = (
185-
OBJ_36 /* Sources */,
186-
);
187-
buildRules = (
188-
);
189-
dependencies = (
190-
);
191-
name = ConcurrencyPackageDescription;
192-
productName = ConcurrencyPackageDescription;
193-
productType = "com.apple.product-type.framework";
194-
};
195190
/* End PBXNativeTarget section */
196191

197192
/* Begin PBXProject section */
@@ -213,8 +208,6 @@
213208
projectRoot = "";
214209
targets = (
215210
"Concurrency::Concurrency" /* Concurrency */,
216-
"Concurrency::SwiftPMPackageDescription" /* ConcurrencyPackageDescription */,
217-
"Concurrency::ConcurrencyPackageTests::ProductTarget" /* ConcurrencyPackageTests */,
218211
"Concurrency::ConcurrencyTests" /* ConcurrencyTests */,
219212
);
220213
};
@@ -225,27 +218,24 @@
225218
isa = PBXSourcesBuildPhase;
226219
buildActionMask = 0;
227220
files = (
221+
41B94846210A4744007E59C8 /* Task.swift in Sources */,
228222
OBJ_27 /* AtomicBool.swift in Sources */,
223+
41B94844210A4744007E59C8 /* SequenceExecutor.swift in Sources */,
224+
41B94845210A4744007E59C8 /* ConcurrentSequenceExecutor.swift in Sources */,
229225
OBJ_28 /* AtomicInt.swift in Sources */,
230226
OBJ_29 /* AtomicReference.swift in Sources */,
227+
41B94843210A4744007E59C8 /* SerialSequenceExecutor.swift in Sources */,
231228
OBJ_30 /* CountDownLatch.swift in Sources */,
232229
);
233230
runOnlyForDeploymentPostprocessing = 0;
234231
};
235-
OBJ_36 /* Sources */ = {
236-
isa = PBXSourcesBuildPhase;
237-
buildActionMask = 0;
238-
files = (
239-
OBJ_37 /* Package.swift in Sources */,
240-
);
241-
runOnlyForDeploymentPostprocessing = 0;
242-
};
243232
OBJ_47 /* Sources */ = {
244233
isa = PBXSourcesBuildPhase;
245234
buildActionMask = 0;
246235
files = (
247236
OBJ_48 /* AtomicBoolTests.swift in Sources */,
248237
OBJ_49 /* AtomicIntTests.swift in Sources */,
238+
41B94849210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift in Sources */,
249239
OBJ_50 /* AtomicReferenceTests.swift in Sources */,
250240
OBJ_51 /* CountDownLatchTests.swift in Sources */,
251241
);
@@ -254,11 +244,6 @@
254244
/* End PBXSourcesBuildPhase section */
255245

256246
/* Begin PBXTargetDependency section */
257-
OBJ_42 /* PBXTargetDependency */ = {
258-
isa = PBXTargetDependency;
259-
target = "Concurrency::ConcurrencyTests" /* ConcurrencyTests */;
260-
targetProxy = 412CDD2E20B88EAB00AF5890 /* PBXContainerItemProxy */;
261-
};
262247
OBJ_54 /* PBXTargetDependency */ = {
263248
isa = PBXTargetDependency;
264249
target = "Concurrency::Concurrency" /* Concurrency */;
@@ -336,24 +321,6 @@
336321
};
337322
name = Debug;
338323
};
339-
OBJ_34 /* Debug */ = {
340-
isa = XCBuildConfiguration;
341-
buildSettings = {
342-
LD = /usr/bin/true;
343-
OTHER_SWIFT_FLAGS = "-swift-version 4 -I $(TOOLCHAIN_DIR)/usr/lib/swift/pm/4 -target x86_64-apple-macosx10.10 -sdk /Applications/Xcode.9.3.0.9E145.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.13.sdk";
344-
SWIFT_VERSION = 4.0;
345-
};
346-
name = Debug;
347-
};
348-
OBJ_35 /* Release */ = {
349-
isa = XCBuildConfiguration;
350-
buildSettings = {
351-
LD = /usr/bin/true;
352-
OTHER_SWIFT_FLAGS = "-swift-version 4 -I $(TOOLCHAIN_DIR)/usr/lib/swift/pm/4 -target x86_64-apple-macosx10.10 -sdk /Applications/Xcode.9.3.0.9E145.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.13.sdk";
353-
SWIFT_VERSION = 4.0;
354-
};
355-
name = Release;
356-
};
357324
OBJ_4 /* Release */ = {
358325
isa = XCBuildConfiguration;
359326
buildSettings = {
@@ -375,18 +342,6 @@
375342
};
376343
name = Release;
377344
};
378-
OBJ_40 /* Debug */ = {
379-
isa = XCBuildConfiguration;
380-
buildSettings = {
381-
};
382-
name = Debug;
383-
};
384-
OBJ_41 /* Release */ = {
385-
isa = XCBuildConfiguration;
386-
buildSettings = {
387-
};
388-
name = Release;
389-
};
390345
OBJ_45 /* Debug */ = {
391346
isa = XCBuildConfiguration;
392347
buildSettings = {
@@ -446,24 +401,6 @@
446401
defaultConfigurationIsVisible = 0;
447402
defaultConfigurationName = Release;
448403
};
449-
OBJ_33 /* Build configuration list for PBXNativeTarget "ConcurrencyPackageDescription" */ = {
450-
isa = XCConfigurationList;
451-
buildConfigurations = (
452-
OBJ_34 /* Debug */,
453-
OBJ_35 /* Release */,
454-
);
455-
defaultConfigurationIsVisible = 0;
456-
defaultConfigurationName = Release;
457-
};
458-
OBJ_39 /* Build configuration list for PBXAggregateTarget "ConcurrencyPackageTests" */ = {
459-
isa = XCConfigurationList;
460-
buildConfigurations = (
461-
OBJ_40 /* Debug */,
462-
OBJ_41 /* Release */,
463-
);
464-
defaultConfigurationIsVisible = 0;
465-
defaultConfigurationName = Release;
466-
};
467404
OBJ_44 /* Build configuration list for PBXNativeTarget "ConcurrencyTests" */ = {
468405
isa = XCConfigurationList;
469406
buildConfigurations = (

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ Provides locking-free synchronization of a mutable object reference. It provides
3131
### `CountDownLatch`
3232
A utility class that allows coordination between threads. A count down latch starts with an initial count. Threads can then decrement the count until it reaches zero, at which point, the suspended waiting thread shall proceed. A `CountDownLatch` behaves differently from a `DispatchSemaphore` once the latch is open. Unlike a semaphore where subsequent waits would still block the caller thread, once a `CountDownLatch` is open, all subsequent waits can directly passthrough.
3333

34+
### `ConcurrentSequenceExecutor`
35+
An execution utility that executes sequences of tasks and returns the final result in a highly concurrent environment.
36+
37+
### `SerialSequenceExecutor`
38+
A debugging executor that executes sequences of tasks and returns the final result serially on the caller thread.
39+
3440
## Installation
3541

3642
### Carthage
@@ -88,4 +94,4 @@ Or you can follow the steps above to generate a Xcode project and run tests with
8894

8995

9096
## License
91-
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency?ref=badge_large)
97+
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency?ref=badge_large)
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
//
2+
// Copyright (c) 2018. Uber Technologies
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
import Foundation
18+
19+
/// An executor that executes sequences of tasks concurrently.
20+
///
21+
/// - seeAlso: `SequenceExecutor`.
22+
/// - seeAlso: `Task`.
23+
public class ConcurrentSequenceExecutor: SequenceExecutor {
24+
25+
/// Initializer.
26+
///
27+
/// - parameter name: The name of the executor.
28+
/// - parameter qos: The quality of service of this executor. This
29+
/// defaults to `userInitiated`.
30+
public init(name: String, qos: DispatchQoS = .userInitiated) {
31+
taskQueue = DispatchQueue(label: "Executor.taskQueue-\(name)", qos: qos, attributes: .concurrent)
32+
}
33+
34+
/// Execute a sequence of tasks concurrently from the given initial task.
35+
///
36+
/// - parameter initialTask: The root task of the sequence of tasks
37+
/// to be executed.
38+
/// - parameter execution: The execution defining the sequence of tasks.
39+
/// When a task completes its execution, this closure is invoked with
40+
/// the task and its produced result. This closure is invoked from
41+
/// multiple threads concurrently, therefore it must be thread-safe.
42+
/// The tasks provided by this closure are executed concurrently.
43+
/// - returns: The execution handle that allows control and monitoring
44+
/// of the sequence of tasks being executed.
45+
public func executeSequence<SequenceResultType>(from initialTask: Task, with execution: @escaping (Task, Any) -> SequenceExecution<SequenceResultType>) -> SequenceExecutionHandle<SequenceResultType> {
46+
let handle: SynchronizedSequenceExecutionHandle<SequenceResultType> = SynchronizedSequenceExecutionHandle()
47+
execute(initialTask, with: handle, execution)
48+
return handle
49+
}
50+
51+
// MARK: - Private
52+
53+
private let taskQueue: DispatchQueue
54+
55+
private func execute<SequenceResultType>(_ task: Task, with sequenceHandle: SynchronizedSequenceExecutionHandle<SequenceResultType>, _ execution: @escaping (Task, Any) -> SequenceExecution<SequenceResultType>) {
56+
taskQueue.async {
57+
guard !sequenceHandle.isCancelled else {
58+
return
59+
}
60+
61+
let result = task.typeErasedExecute()
62+
let nextExecution = execution(task, result)
63+
switch nextExecution {
64+
case .continueSequence(let nextTask):
65+
self.execute(nextTask, with: sequenceHandle, execution)
66+
case .endOfSequence(let result):
67+
sequenceHandle.sequenceDidComplete(with: result)
68+
}
69+
}
70+
}
71+
}
72+
73+
private class SynchronizedSequenceExecutionHandle<SequenceResultType>: SequenceExecutionHandle<SequenceResultType> {
74+
75+
private let latch = CountDownLatch(count: 1)
76+
private let didCancel = AtomicBool(initialValue: false)
77+
78+
// Use a lock to ensure result is properly accessed, since the read
79+
// `await` method may be invoked on a different thread than the write
80+
// `sequenceDidComplete` method.
81+
private let resultLock = NSRecursiveLock()
82+
private var result: SequenceResultType?
83+
84+
fileprivate var isCancelled: Bool {
85+
return didCancel.value
86+
}
87+
88+
fileprivate override func await(withTimeout timeout: TimeInterval?) throws -> SequenceResultType {
89+
let didComplete = latch.await(timeout: timeout)
90+
if !didComplete {
91+
throw SequenceExecutionError.awaitTimeout
92+
}
93+
94+
resultLock.lock()
95+
defer {
96+
resultLock.unlock()
97+
}
98+
// If latch was counted down, the result must have been set. Therefore,
99+
// this forced unwrap is safe.
100+
return result!
101+
}
102+
103+
fileprivate func sequenceDidComplete(with result: SequenceResultType) {
104+
resultLock.lock()
105+
self.result = result
106+
resultLock.unlock()
107+
108+
latch.countDown()
109+
}
110+
111+
fileprivate override func cancel() {
112+
didCancel.compareAndSet(expect: false, newValue: true)
113+
}
114+
}

0 commit comments

Comments
 (0)